Skip to content

Commit eb7d29c

Browse files
authored
feat: bulk helper improvements (#51)
Applies patches from elastic/elasticsearch-js#2199 and elastic/elasticsearch-js#2027, adding support for an onSuccess callback and fixing a bug that would cause the helper to hang when the flushInterval was lower than the request timeout. --------- Co-authored-by: JoshMock <[email protected]>
1 parent 3360c03 commit eb7d29c

File tree

2 files changed

+656
-56
lines changed

2 files changed

+656
-56
lines changed

src/helpers.ts

Lines changed: 97 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ export interface BulkStats {
7474
aborted: boolean
7575
}
7676

77-
interface IndexAction {
77+
interface IndexActionOperation {
7878
index: T.BulkIndexOperation
7979
}
8080

81-
interface CreateAction {
81+
interface CreateActionOperation {
8282
create: T.BulkCreateOperation
8383
}
8484

@@ -90,7 +90,9 @@ interface DeleteAction {
9090
delete: T.BulkDeleteOperation
9191
}
9292

93-
type UpdateAction = [UpdateActionOperation, Record<string, any>]
93+
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
94+
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
95+
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
9496
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
9597

9698
export interface OnDropDocument<TDocument = unknown> {
@@ -101,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
101103
retried: boolean
102104
}
103105

106+
type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>
107+
108+
export interface OnSuccessDocument<TDocument = unknown> {
109+
result: BulkResponseItem
110+
document?: TDocument
111+
}
112+
113+
interface ZippedResult<TDocument = unknown> {
114+
result: BulkResponseItem
115+
raw: {
116+
action: string
117+
document?: string
118+
}
119+
// this is a function so that deserialization is only done when needed
120+
// to avoid a performance hit
121+
document?: () => TDocument
122+
}
123+
104124
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
105125
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
106126
onDocument: (doc: TDocument) => Action
@@ -110,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
110130
retries?: number
111131
wait?: number
112132
onDrop?: (doc: OnDropDocument<TDocument>) => void
133+
onSuccess?: (doc: OnSuccessDocument) => void
113134
}
114135

115136
export interface BulkHelper<T> extends Promise<BulkStats> {
@@ -379,7 +400,7 @@ export default class Helpers {
379400
clearTimeout(timeoutRef)
380401
}
381402

382-
// In some cases the previos http call does not have finished,
403+
// In some cases the previous http call does not have finished,
383404
// or we didn't reach the flush bytes threshold, so we force one last operation.
384405
if (loadedOperations > 0) {
385406
const send = await semaphore()
@@ -415,8 +436,8 @@ export default class Helpers {
415436
// to guarantee that no more than the number of operations
416437
// allowed to run at the same time are executed.
417438
// It returns a semaphore function which resolves in the next tick
418-
// if we didn't reach the maximim concurrency yet, otherwise it returns
419-
// a promise that resolves as soon as one of the running request has finshed.
439+
// if we didn't reach the maximum concurrency yet, otherwise it returns
440+
// a promise that resolves as soon as one of the running requests has finished.
420441
// The semaphore function resolves a send function, which will be used
421442
// to send the actual msearch request.
422443
// It also returns a finish function, which returns a promise that is resolved
@@ -548,6 +569,9 @@ export default class Helpers {
548569
retries = this[kMaxRetries],
549570
wait = 5000,
550571
onDrop = noop,
572+
// onSuccess does not default to noop, to avoid the performance hit
573+
// of deserializing every document in the bulk request
574+
onSuccess,
551575
...bulkOptions
552576
} = options
553577

@@ -620,26 +644,25 @@ export default class Helpers {
620644
let chunkBytes = 0
621645
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line
622646

623-
// @ts-expect-error datasoruce is an iterable
647+
// @ts-expect-error datasource is an iterable
624648
for await (const chunk of datasource) {
625649
if (shouldAbort) break
626650
timeoutRef.refresh()
627-
const action = onDocument(chunk)
628-
const operation = Array.isArray(action)
629-
? Object.keys(action[0])[0]
630-
: Object.keys(action)[0]
651+
const result = onDocument(chunk)
652+
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
653+
const operation = Object.keys(action)[0]
631654
if (operation === 'index' || operation === 'create') {
632655
actionBody = serializer.serialize(action)
633-
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
656+
payloadBody = typeof payload === 'string'
657+
? payload
658+
: serializer.serialize(payload)
634659
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
635660
bulkBody.push(actionBody, payloadBody)
636661
} else if (operation === 'update') {
637-
// @ts-expect-error in case of update action is an array
638-
actionBody = serializer.serialize(action[0])
662+
actionBody = serializer.serialize(action)
639663
payloadBody = typeof chunk === 'string'
640664
? `{"doc":${chunk}}`
641-
// @ts-expect-error in case of update action is an array
642-
: serializer.serialize({ doc: chunk, ...action[1] })
665+
: serializer.serialize({ doc: chunk, ...payload })
643666
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
644667
bulkBody.push(actionBody, payloadBody)
645668
} else if (operation === 'delete') {
@@ -653,15 +676,16 @@ export default class Helpers {
653676

654677
if (chunkBytes >= flushBytes) {
655678
stats.bytes += chunkBytes
656-
const send = await semaphore()
657-
send(bulkBody.slice())
679+
const bulkBodyCopy = bulkBody.slice()
658680
bulkBody.length = 0
659681
chunkBytes = 0
682+
const send = await semaphore()
683+
send(bulkBodyCopy)
660684
}
661685
}
662686

663687
clearTimeout(timeoutRef)
664-
// In some cases the previos http call does not have finished,
688+
// In some cases the previous http call has not finished,
665689
// or we didn't reach the flush bytes threshold, so we force one last operation.
666690
if (!shouldAbort && chunkBytes > 0) {
667691
const send = await semaphore()
@@ -697,8 +721,8 @@ export default class Helpers {
697721
// to guarantee that no more than the number of operations
698722
// allowed to run at the same time are executed.
699723
// It returns a semaphore function which resolves in the next tick
700-
// if we didn't reach the maximim concurrency yet, otherwise it returns
701-
// a promise that resolves as soon as one of the running request has finshed.
724+
// if we didn't reach the maximum concurrency yet, otherwise it returns
725+
// a promise that resolves as soon as one of the running requests has finished.
702726
// The semaphore function resolves a send function, which will be used
703727
// to send the actual bulk request.
704728
// It also returns a finish function, which returns a promise that is resolved
@@ -805,57 +829,93 @@ export default class Helpers {
805829
callback()
806830
}
807831

832+
/**
833+
* Zips bulk response items (the action's result) with the original document body.
834+
* The raw string version of action and document lines are also included.
835+
*/
836+
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
837+
const zipped = []
838+
let indexSlice = 0
839+
for (let i = 0, len = responseItems.length; i < len; i++) {
840+
const result = responseItems[i]
841+
const operation = Object.keys(result)[0]
842+
let zipResult
843+
844+
if (operation === 'delete') {
845+
zipResult = {
846+
result,
847+
raw: { action: bulkBody[indexSlice] }
848+
}
849+
indexSlice += 1
850+
} else {
851+
const document = bulkBody[indexSlice + 1]
852+
zipResult = {
853+
result,
854+
raw: { action: bulkBody[indexSlice], document },
855+
// this is a function so that deserialization is only done when needed
856+
// to avoid a performance hit
857+
document: () => serializer.deserialize(document)
858+
}
859+
indexSlice += 2
860+
}
861+
862+
zipped.push(zipResult as ZippedResult)
863+
}
864+
865+
return zipped
866+
}
867+
808868
function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
809869
if (shouldAbort) return callback(null, [])
810870
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
811871
.then(response => {
812872
const result = response.body
873+
const results = zipBulkResults(result.items, bulkBody)
874+
813875
if (!result.errors) {
814876
stats.successful += result.items.length
815-
for (const item of result.items) {
816-
if (item.update?.result === 'noop') {
877+
for (const item of results) {
878+
const { result, document = noop } = item
879+
if (result.update?.result === 'noop') {
817880
stats.noop++
818881
}
882+
if (onSuccess != null) onSuccess({ result, document: document() })
819883
}
820884
return callback(null, [])
821885
}
822886
const retry = []
823-
const { items } = result
824-
let indexSlice = 0
825-
for (let i = 0, len = items.length; i < len; i++) {
826-
const action = items[i]
827-
const operation = Object.keys(action)[0]
887+
for (const item of results) {
888+
const { result, raw, document = noop } = item
889+
const operation = Object.keys(result)[0]
828890
// @ts-expect-error
829-
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
891+
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
830892
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')
831893

832894
if (responseItem.status >= 400) {
833895
// 429 is the only status code where we might want to retry
834896
// a document, because it was not an error in the document itself,
835-
// but the ES node were handling too many operations.
897+
// but the ES node was handling too many operations.
836898
if (responseItem.status === 429) {
837-
retry.push(bulkBody[indexSlice])
899+
retry.push(raw.action)
838900
/* istanbul ignore next */
839901
if (operation !== 'delete') {
840-
retry.push(bulkBody[indexSlice + 1])
902+
retry.push(raw.document ?? '')
841903
}
842904
} else {
843905
onDrop({
844906
status: responseItem.status,
845907
error: responseItem.error ?? null,
846-
operation: serializer.deserialize(bulkBody[indexSlice]),
908+
operation: serializer.deserialize(raw.action),
847909
// @ts-expect-error
848-
document: operation !== 'delete'
849-
? serializer.deserialize(bulkBody[indexSlice + 1])
850-
: null,
910+
document: document(),
851911
retried: isRetrying
852912
})
853913
stats.failed += 1
854914
}
855915
} else {
856916
stats.successful += 1
917+
if (onSuccess != null) onSuccess({ result, document: document() })
857918
}
858-
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
859919
}
860920
callback(null, retry)
861921
})

0 commit comments

Comments
 (0)