Skip to content

onSuccess function for bulk helper #2199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/changelog.asciidoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
[[changelog-client]]
== Release notes

[discrete]
=== 8.14.0

[discrete]
===== `onSuccess` callback added to bulk helper

The bulk helper now supports an `onSuccess` callback that will be called for each successful operation. https://github.com/elastic/elasticsearch-js/pull/2199[#2199]

[discrete]
=== 8.13.0

Expand Down
11 changes: 11 additions & 0 deletions docs/helpers.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ const b = client.helpers.bulk({
})
----

|`onSuccess`
a|A function that is called for each successful operation in the bulk request, which includes the result from Elasticsearch along with the original document that was sent, or `null` for delete operations.
[source,js]
----
const b = client.helpers.bulk({
onSuccess ({ result, document }) {
console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
}
})
----

|`flushBytes`
a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. +
_Default:_ `5000000`
Expand Down
92 changes: 75 additions & 17 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
retried: boolean
}

type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>

export interface OnSuccessDocument<TDocument = unknown> {
result: BulkResponseItem
document?: TDocument
}

interface ZippedResult<TDocument = unknown> {
result: BulkResponseItem
raw: {
action: string
document?: string
}
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document?: () => TDocument
}

export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
Expand All @@ -112,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
onSuccess?: (doc: OnSuccessDocument) => void
refreshOnCompletion?: boolean | string
}

Expand Down Expand Up @@ -551,6 +570,9 @@ export default class Helpers {
retries = this[kMaxRetries],
wait = 5000,
onDrop = noop,
// onSuccess does not default to noop, to avoid the performance hit
// of deserializing every document in the bulk request
onSuccess,
refreshOnCompletion = false,
...bulkOptions
} = options
Expand Down Expand Up @@ -817,57 +839,93 @@ export default class Helpers {
callback()
}

/**
* Zips bulk response items (the action's result) with the original document body.
* The raw string version of action and document lines are also included.
*/
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
const zipped = []
let indexSlice = 0
for (let i = 0, len = responseItems.length; i < len; i++) {
const result = responseItems[i]
const operation = Object.keys(result)[0]
let zipResult

if (operation === 'delete') {
zipResult = {
result,
raw: { action: bulkBody[indexSlice] }
}
indexSlice += 1
} else {
const document = bulkBody[indexSlice + 1]
zipResult = {
result,
raw: { action: bulkBody[indexSlice], document },
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document: () => serializer.deserialize(document)
}
indexSlice += 2
}

zipped.push(zipResult as ZippedResult)
}

return zipped
}

function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
if (shouldAbort) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(response => {
const result = response.body
const results = zipBulkResults(result.items, bulkBody)

if (!result.errors) {
stats.successful += result.items.length
for (const item of result.items) {
if (item.update?.result === 'noop') {
for (const item of results) {
const { result, document = noop } = item
if (result.update?.result === 'noop') {
stats.noop++
}
if (onSuccess != null) onSuccess({ result, document: document() })
}
return callback(null, [])
}
const retry = []
const { items } = result
let indexSlice = 0
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
for (const item of results) {
const { result, raw, document = noop } = item
const operation = Object.keys(result)[0]
// @ts-expect-error
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')

if (responseItem.status >= 400) {
// 429 is the only staus code where we might want to retry
// 429 is the only status code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
// but the ES node was handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice])
retry.push(raw.action)
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
retry.push(raw.document ?? '')
}
} else {
onDrop({
status: responseItem.status,
error: responseItem.error ?? null,
operation: serializer.deserialize(bulkBody[indexSlice]),
operation: serializer.deserialize(raw.action),
// @ts-expect-error
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
document: document(),
retried: isRetrying
})
stats.failed += 1
}
} else {
stats.successful += 1
if (onSuccess != null) onSuccess({ result, document: document() })
}
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
}
callback(null, retry)
})
Expand Down
Loading