Skip to content

Commit 4aa00e0

Browse files
authored
onSuccess function for bulk helper (#2199)
* Bulk helper onSuccess callback For #2090 Includes refactor of the tryBulk result processing code, to make iterating over bulk response data easier to understand. * Add onSuccess tests for each datasource type * Cleanup, additional comments * Add documentation for onSuccess callback * Update changelog * Drop link to 8.14 release notes. Page not yet published, breaking docs build.
1 parent e2974b0 commit 4aa00e0

File tree

4 files changed

+407
-151
lines changed

4 files changed

+407
-151
lines changed

docs/changelog.asciidoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
[[changelog-client]]
22
== Release notes
33

4+
[discrete]
5+
=== 8.14.0
6+
7+
[discrete]
8+
===== `onSuccess` callback added to bulk helper
9+
10+
The bulk helper now supports an `onSuccess` callback that will be called for each successful operation. https://github.com/elastic/elasticsearch-js/pull/2199[#2199]
11+
412
[discrete]
513
=== 8.13.0
614

docs/helpers.asciidoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,17 @@ const b = client.helpers.bulk({
9898
})
9999
----
100100

101+
|`onSuccess`
102+
a|A function that is called for each successful operation in the bulk request, which includes the result from Elasticsearch along with the original document that was sent, or `null` for delete operations.
103+
[source,js]
104+
----
105+
const b = client.helpers.bulk({
106+
onSuccess ({ result, document }) {
107+
console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
108+
}
109+
})
110+
----
111+
101112
|`flushBytes`
102113
a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. +
103114
_Default:_ `5000000`

src/helpers.ts

Lines changed: 75 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
103103
retried: boolean
104104
}
105105

106+
type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>
107+
108+
export interface OnSuccessDocument<TDocument = unknown> {
109+
result: BulkResponseItem
110+
document?: TDocument
111+
}
112+
113+
interface ZippedResult<TDocument = unknown> {
114+
result: BulkResponseItem
115+
raw: {
116+
action: string
117+
document?: string
118+
}
119+
// this is a function so that deserialization is only done when needed
120+
// to avoid a performance hit
121+
document?: () => TDocument
122+
}
123+
106124
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
107125
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
108126
onDocument: (doc: TDocument) => Action
@@ -112,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
112130
retries?: number
113131
wait?: number
114132
onDrop?: (doc: OnDropDocument<TDocument>) => void
133+
onSuccess?: (doc: OnSuccessDocument) => void
115134
refreshOnCompletion?: boolean | string
116135
}
117136

@@ -551,6 +570,9 @@ export default class Helpers {
551570
retries = this[kMaxRetries],
552571
wait = 5000,
553572
onDrop = noop,
573+
// onSuccess does not default to noop, to avoid the performance hit
574+
// of deserializing every document in the bulk request
575+
onSuccess,
554576
refreshOnCompletion = false,
555577
...bulkOptions
556578
} = options
@@ -817,57 +839,93 @@ export default class Helpers {
817839
callback()
818840
}
819841

842+
/**
843+
* Zips bulk response items (the action's result) with the original document body.
844+
* The raw string version of action and document lines are also included.
845+
*/
846+
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
847+
const zipped = []
848+
let indexSlice = 0
849+
for (let i = 0, len = responseItems.length; i < len; i++) {
850+
const result = responseItems[i]
851+
const operation = Object.keys(result)[0]
852+
let zipResult
853+
854+
if (operation === 'delete') {
855+
zipResult = {
856+
result,
857+
raw: { action: bulkBody[indexSlice] }
858+
}
859+
indexSlice += 1
860+
} else {
861+
const document = bulkBody[indexSlice + 1]
862+
zipResult = {
863+
result,
864+
raw: { action: bulkBody[indexSlice], document },
865+
// this is a function so that deserialization is only done when needed
866+
// to avoid a performance hit
867+
document: () => serializer.deserialize(document)
868+
}
869+
indexSlice += 2
870+
}
871+
872+
zipped.push(zipResult as ZippedResult)
873+
}
874+
875+
return zipped
876+
}
877+
820878
function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
821879
if (shouldAbort) return callback(null, [])
822880
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
823881
.then(response => {
824882
const result = response.body
883+
const results = zipBulkResults(result.items, bulkBody)
884+
825885
if (!result.errors) {
826886
stats.successful += result.items.length
827-
for (const item of result.items) {
828-
if (item.update?.result === 'noop') {
887+
for (const item of results) {
888+
const { result, document = noop } = item
889+
if (result.update?.result === 'noop') {
829890
stats.noop++
830891
}
892+
if (onSuccess != null) onSuccess({ result, document: document() })
831893
}
832894
return callback(null, [])
833895
}
834896
const retry = []
835-
const { items } = result
836-
let indexSlice = 0
837-
for (let i = 0, len = items.length; i < len; i++) {
838-
const action = items[i]
839-
const operation = Object.keys(action)[0]
897+
for (const item of results) {
898+
const { result, raw, document = noop } = item
899+
const operation = Object.keys(result)[0]
840900
// @ts-expect-error
841-
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
901+
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
842902
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')
843903

844904
if (responseItem.status >= 400) {
845-
// 429 is the only staus code where we might want to retry
905+
// 429 is the only status code where we might want to retry
846906
// a document, because it was not an error in the document itself,
847-
// but the ES node were handling too many operations.
907+
// but the ES node was handling too many operations.
848908
if (responseItem.status === 429) {
849-
retry.push(bulkBody[indexSlice])
909+
retry.push(raw.action)
850910
/* istanbul ignore next */
851911
if (operation !== 'delete') {
852-
retry.push(bulkBody[indexSlice + 1])
912+
retry.push(raw.document ?? '')
853913
}
854914
} else {
855915
onDrop({
856916
status: responseItem.status,
857917
error: responseItem.error ?? null,
858-
operation: serializer.deserialize(bulkBody[indexSlice]),
918+
operation: serializer.deserialize(raw.action),
859919
// @ts-expect-error
860-
document: operation !== 'delete'
861-
? serializer.deserialize(bulkBody[indexSlice + 1])
862-
: null,
920+
document: document(),
863921
retried: isRetrying
864922
})
865923
stats.failed += 1
866924
}
867925
} else {
868926
stats.successful += 1
927+
if (onSuccess != null) onSuccess({ result, document: document() })
869928
}
870-
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
871929
}
872930
callback(null, retry)
873931
})

0 commit comments

Comments
 (0)