Skip to content

Commit 1caa7f4

Browse files
committed
Fix rejected patches from elastic/elasticsearch-js#2199
1 parent 66b78b3 commit 1caa7f4

File tree

2 files changed

+83
-38
lines changed

2 files changed

+83
-38
lines changed

src/helpers.ts

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
128128
retries?: number
129129
wait?: number
130130
onDrop?: (doc: OnDropDocument<TDocument>) => void
131+
onSuccess?: (doc: OnSuccessDocument) => void
131132
}
132133

133134
export interface BulkHelper<T> extends Promise<BulkStats> {
@@ -397,7 +398,7 @@ export default class Helpers {
397398
clearTimeout(timeoutRef)
398399
}
399400

400-
// In some cases the previos http call does not have finished,
401+
// In some cases the previous http call does not have finished,
401402
// or we didn't reach the flush bytes threshold, so we force one last operation.
402403
if (loadedOperations > 0) {
403404
const send = await semaphore()
@@ -433,8 +434,8 @@ export default class Helpers {
433434
// to guarantee that no more than the number of operations
434435
// allowed to run at the same time are executed.
435436
// It returns a semaphore function which resolves in the next tick
436-
// if we didn't reach the maximim concurrency yet, otherwise it returns
437-
// a promise that resolves as soon as one of the running request has finshed.
437+
// if we didn't reach the maximum concurrency yet, otherwise it returns
438+
// a promise that resolves as soon as one of the running requests has finished.
438439
// The semaphore function resolves a send function, which will be used
439440
// to send the actual msearch request.
440441
// It also returns a finish function, which returns a promise that is resolved
@@ -566,6 +567,9 @@ export default class Helpers {
566567
retries = this[kMaxRetries],
567568
wait = 5000,
568569
onDrop = noop,
570+
// onSuccess does not default to noop, to avoid the performance hit
571+
// of deserializing every document in the bulk request
572+
onSuccess,
569573
...bulkOptions
570574
} = options
571575

@@ -638,7 +642,7 @@ export default class Helpers {
638642
let chunkBytes = 0
639643
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line
640644

641-
// @ts-expect-error datasoruce is an iterable
645+
// @ts-expect-error datasource is an iterable
642646
for await (const chunk of datasource) {
643647
if (shouldAbort) break
644648
timeoutRef.refresh()
@@ -679,7 +683,7 @@ export default class Helpers {
679683
}
680684

681685
clearTimeout(timeoutRef)
682-
// In some cases the previos http call does not have finished,
686+
// In some cases the previous http call has not finished,
683687
// or we didn't reach the flush bytes threshold, so we force one last operation.
684688
if (!shouldAbort && chunkBytes > 0) {
685689
const send = await semaphore()
@@ -715,8 +719,8 @@ export default class Helpers {
715719
// to guarantee that no more than the number of operations
716720
// allowed to run at the same time are executed.
717721
// It returns a semaphore function which resolves in the next tick
718-
// if we didn't reach the maximim concurrency yet, otherwise it returns
719-
// a promise that resolves as soon as one of the running request has finshed.
722+
// if we didn't reach the maximum concurrency yet, otherwise it returns
723+
// a promise that resolves as soon as one of the running requests has finished.
720724
// The semaphore function resolves a send function, which will be used
721725
// to send the actual bulk request.
722726
// It also returns a finish function, which returns a promise that is resolved
@@ -823,57 +827,93 @@ export default class Helpers {
823827
callback()
824828
}
825829

830+
/**
831+
* Zips bulk response items (the action's result) with the original document body.
832+
* The raw string version of action and document lines are also included.
833+
*/
834+
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
835+
const zipped = []
836+
let indexSlice = 0
837+
for (let i = 0, len = responseItems.length; i < len; i++) {
838+
const result = responseItems[i]
839+
const operation = Object.keys(result)[0]
840+
let zipResult
841+
842+
if (operation === 'delete') {
843+
zipResult = {
844+
result,
845+
raw: { action: bulkBody[indexSlice] }
846+
}
847+
indexSlice += 1
848+
} else {
849+
const document = bulkBody[indexSlice + 1]
850+
zipResult = {
851+
result,
852+
raw: { action: bulkBody[indexSlice], document },
853+
// this is a function so that deserialization is only done when needed
854+
// to avoid a performance hit
855+
document: () => serializer.deserialize(document)
856+
}
857+
indexSlice += 2
858+
}
859+
860+
zipped.push(zipResult as ZippedResult)
861+
}
862+
863+
return zipped
864+
}
865+
826866
function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
827867
if (shouldAbort) return callback(null, [])
828868
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
829869
.then(response => {
830870
const result = response.body
871+
const results = zipBulkResults(result.items, bulkBody)
872+
831873
if (!result.errors) {
832874
stats.successful += result.items.length
833-
for (const item of result.items) {
834-
if (item.update?.result === 'noop') {
875+
for (const item of results) {
876+
const { result, document = noop } = item
877+
if (result.update?.result === 'noop') {
835878
stats.noop++
836879
}
880+
if (onSuccess != null) onSuccess({ result, document: document() })
837881
}
838882
return callback(null, [])
839883
}
840884
const retry = []
841-
const { items } = result
842-
let indexSlice = 0
843-
for (let i = 0, len = items.length; i < len; i++) {
844-
const action = items[i]
845-
const operation = Object.keys(action)[0]
885+
for (const item of results) {
886+
const { result, raw, document = noop } = item
887+
const operation = Object.keys(result)[0]
846888
// @ts-expect-error
847-
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
889+
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
848890
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')
849891

850892
if (responseItem.status >= 400) {
851893
// 429 is the only status code where we might want to retry
852894
// a document, because it was not an error in the document itself,
853-
// but the ES node were handling too many operations.
895+
// but the ES node was handling too many operations.
854896
if (responseItem.status === 429) {
855-
retry.push(bulkBody[indexSlice])
897+
retry.push(raw.action)
856898
/* istanbul ignore next */
857899
if (operation !== 'delete') {
858-
retry.push(bulkBody[indexSlice + 1])
900+
retry.push(raw.document ?? '')
859901
}
860902
} else {
861903
onDrop({
862904
status: responseItem.status,
863905
error: responseItem.error ?? null,
864-
operation: serializer.deserialize(bulkBody[indexSlice]),
906+
operation: serializer.deserialize(raw.action),
865907
// @ts-expect-error
866-
document: operation !== 'delete'
867-
? serializer.deserialize(bulkBody[indexSlice + 1])
868-
: null,
908+
document: document(),
869909
retried: isRetrying
870910
})
871911
stats.failed += 1
872912
}
873913
} else {
874914
stats.successful += 1
915+
if (onSuccess != null) onSuccess({ result, document: document() })
875916
}
876-
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
877917
}
878918
callback(null, retry)
879919
})

test/unit/helpers/bulk.test.ts

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
* under the License.
1818
*/
1919

20+
import FakeTimers from '@sinonjs/fake-timers'
2021
import { AssertionError } from 'assert'
21-
import * as http from 'http'
2222
import { createReadStream } from 'fs'
23+
import * as http from 'http'
2324
import { join } from 'path'
2425
import split from 'split2'
25-
import FakeTimers from '@sinonjs/fake-timers'
2626
import { test } from 'tap'
2727
import { Client, errors } from '../../../'
2828
import { buildServer, connection } from '../../utils'
@@ -610,7 +610,8 @@ test('bulk index', t => {
610610

611611
const client = new Client({
612612
node: 'http://localhost:9200',
613-
Connection: MockConnection
613+
Connection: MockConnection,
614+
compression: false
614615
})
615616

616617
let count = 0
@@ -701,7 +702,8 @@ test('bulk index', t => {
701702

702703
const client = new Client({
703704
node: 'http://localhost:9200',
704-
Connection: MockConnection
705+
Connection: MockConnection,
706+
compression: false
705707
})
706708
const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'small-dataset.ndjson'), 'utf8')
707709

@@ -795,7 +797,8 @@ test('bulk index', t => {
795797

796798
const client = new Client({
797799
node: 'http://localhost:9200',
798-
Connection: MockConnection
800+
Connection: MockConnection,
801+
compression: false
799802
})
800803

801804
async function * generator () {
@@ -1033,8 +1036,6 @@ test('bulk update', t => {
10331036
})
10341037
})
10351038

1036-
1037-
10381039
t.end()
10391040
})
10401041

@@ -1164,7 +1165,7 @@ test('bulk delete', t => {
11641165
t.test('Should call onDrop on the correct document when doing a mix of operations that includes deletes', async t => {
11651166
// checks to ensure onDrop doesn't provide the wrong document when some operations are deletes
11661167
// see https://github.com/elastic/elasticsearch-js/issues/1751
1167-
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1168+
async function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
11681169
res.setHeader('content-type', 'application/json')
11691170
res.end(JSON.stringify({
11701171
took: 0,
@@ -1178,14 +1179,17 @@ test('bulk delete', t => {
11781179
}
11791180

11801181
const [{ port }, server] = await buildServer(handler)
1181-
const client = new Client({ node: `http://localhost:${port}` })
1182+
const client = new Client({
1183+
node: `http://localhost:${port}`,
1184+
compression: false
1185+
})
11821186
let counter = 0
11831187
const result = await client.helpers.bulk({
11841188
datasource: dataset.slice(),
11851189
concurrency: 1,
11861190
wait: 10,
11871191
retries: 0,
1188-
onDocument (doc) {
1192+
onDocument (_doc) {
11891193
counter++
11901194
if (counter === 1) {
11911195
return {
@@ -1237,7 +1241,8 @@ test('bulk delete', t => {
12371241

12381242
const client = new Client({
12391243
node: 'http://localhost:9200',
1240-
Connection: MockConnection
1244+
Connection: MockConnection,
1245+
compression: false
12411246
})
12421247

12431248
let docCount = 0
@@ -1464,12 +1469,12 @@ test('Flush interval', t => {
14641469
})(),
14651470
flushBytes: 5000000,
14661471
concurrency: 1,
1467-
onDocument (doc) {
1472+
onDocument (_doc) {
14681473
return {
14691474
index: { _index: 'test' }
14701475
}
14711476
},
1472-
onDrop (doc) {
1477+
onDrop (_doc) {
14731478
t.fail('This should never be called')
14741479
}
14751480
})
@@ -1525,12 +1530,12 @@ test('Flush interval', t => {
15251530
})(),
15261531
flushBytes: 5000000,
15271532
concurrency: 1,
1528-
onDocument (doc) {
1533+
onDocument (_doc) {
15291534
return {
15301535
index: { _index: 'test' }
15311536
}
15321537
},
1533-
onDrop (doc) {
1538+
onDrop (_doc) {
15341539
t.fail('This should never be called')
15351540
}
15361541
})

0 commit comments

Comments
 (0)