Skip to content

Commit 66b78b3

Browse files
JoshMockgithub-actions[bot]
authored andcommitted
1 parent 3360c03 commit 66b78b3

File tree

2 files changed

+206
-10
lines changed

2 files changed

+206
-10
lines changed

src/helpers.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ export interface OnDropDocument<TDocument = unknown> {
101101
retried: boolean
102102
}
103103

104+
type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>
105+
106+
export interface OnSuccessDocument<TDocument = unknown> {
107+
result: BulkResponseItem
108+
document?: TDocument
109+
}
110+
111+
interface ZippedResult<TDocument = unknown> {
112+
result: BulkResponseItem
113+
raw: {
114+
action: string
115+
document?: string
116+
}
117+
// this is a function so that deserialization is only done when needed
118+
// to avoid a performance hit
119+
document?: () => TDocument
120+
}
121+
104122
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
105123
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
106124
onDocument: (doc: TDocument) => Action

test/unit/helpers/bulk.test.ts

Lines changed: 188 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ test('bulk index', t => {
431431

432432
t.test('Server error', async t => {
433433
const MockConnection = connection.buildMockConnection({
434-
onRequest (params) {
434+
onRequest (_params) {
435435
return {
436436
statusCode: 500,
437437
body: { somothing: 'went wrong' }
@@ -447,12 +447,12 @@ test('bulk index', t => {
447447
datasource: dataset.slice(),
448448
flushBytes: 1,
449449
concurrency: 1,
450-
onDocument (doc) {
450+
onDocument (_doc) {
451451
return {
452452
index: { _index: 'test' }
453453
}
454454
},
455-
onDrop (doc) {
455+
onDrop (_doc) {
456456
t.fail('This should never be called')
457457
}
458458
})
@@ -467,7 +467,7 @@ test('bulk index', t => {
467467

468468
t.test('Server error (high flush size, to trigger the finish error)', async t => {
469469
const MockConnection = connection.buildMockConnection({
470-
onRequest (params) {
470+
onRequest (_params) {
471471
return {
472472
statusCode: 500,
473473
body: { somothing: 'went wrong' }
@@ -483,12 +483,12 @@ test('bulk index', t => {
483483
datasource: dataset.slice(),
484484
flushBytes: 5000000,
485485
concurrency: 1,
486-
onDocument (doc) {
486+
onDocument (_doc) {
487487
return {
488488
index: { _index: 'test' }
489489
}
490490
},
491-
onDrop (doc) {
491+
onDrop (_doc) {
492492
t.fail('This should never be called')
493493
}
494494
})
@@ -545,12 +545,12 @@ test('bulk index', t => {
545545
flushBytes: 1,
546546
concurrency: 1,
547547
wait: 10,
548-
onDocument (doc) {
548+
onDocument (_doc) {
549549
return {
550550
index: { _index: 'test' }
551551
}
552552
},
553-
onDrop (doc) {
553+
onDrop (_doc) {
554554
b.abort()
555555
}
556556
})
@@ -571,7 +571,7 @@ test('bulk index', t => {
571571
t.test('Invalid operation', t => {
572572
t.plan(2)
573573
const MockConnection = connection.buildMockConnection({
574-
onRequest (params) {
574+
onRequest (_params) {
575575
return { body: { errors: false, items: [{}] } }
576576
}
577577
})
@@ -586,7 +586,7 @@ test('bulk index', t => {
586586
flushBytes: 1,
587587
concurrency: 1,
588588
// @ts-expect-error
589-
onDocument (doc) {
589+
onDocument (_doc) {
590590
return {
591591
foo: { _index: 'test' }
592592
}
@@ -598,6 +598,43 @@ test('bulk index', t => {
598598
})
599599
})
600600

601+
t.test('should call onSuccess callback for each indexed document', async t => {
602+
const MockConnection = connection.buildMockConnection({
603+
onRequest (params) {
604+
// @ts-expect-error
605+
let [action] = params.body.split('\n')
606+
action = JSON.parse(action)
607+
return { body: { errors: false, items: [action] } }
608+
}
609+
})
610+
611+
const client = new Client({
612+
node: 'http://localhost:9200',
613+
Connection: MockConnection
614+
})
615+
616+
let count = 0
617+
await client.helpers.bulk<Document>({
618+
datasource: dataset.slice(),
619+
flushBytes: 1,
620+
concurrency: 1,
621+
onDocument (_doc) {
622+
return {
623+
index: { _index: 'test' }
624+
}
625+
},
626+
onSuccess ({ result, document }) {
627+
t.same(result, { index: { _index: 'test' }})
628+
t.same(document, dataset[count++])
629+
},
630+
onDrop (_doc) {
631+
t.fail('This should never be called')
632+
}
633+
})
634+
t.equal(count, 3)
635+
t.end()
636+
})
637+
601638
t.end()
602639
})
603640

@@ -652,6 +689,44 @@ test('bulk index', t => {
652689
})
653690
})
654691

692+
t.test('onSuccess is called for each indexed document', async t => {
693+
const MockConnection = connection.buildMockConnection({
694+
onRequest (params) {
695+
// @ts-expect-error
696+
let [action] = params.body.split('\n')
697+
action = JSON.parse(action)
698+
return { body: { errors: false, items: [action] } }
699+
}
700+
})
701+
702+
const client = new Client({
703+
node: 'http://localhost:9200',
704+
Connection: MockConnection
705+
})
706+
const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'small-dataset.ndjson'), 'utf8')
707+
708+
let count = 0
709+
await client.helpers.bulk<Document>({
710+
datasource: stream.pipe(split()),
711+
flushBytes: 1,
712+
concurrency: 1,
713+
onDocument (_doc) {
714+
return {
715+
index: { _index: 'test' }
716+
}
717+
},
718+
onSuccess ({ result, document }) {
719+
t.same(result, { index: { _index: 'test' }})
720+
t.same(document, dataset[count++])
721+
},
722+
onDrop (_doc) {
723+
t.fail('This should never be called')
724+
}
725+
})
726+
t.equal(count, 3)
727+
t.end()
728+
})
729+
655730
t.end()
656731
})
657732

@@ -707,6 +782,50 @@ test('bulk index', t => {
707782
aborted: false
708783
})
709784
})
785+
786+
t.test('onSuccess is called for each indexed document', async t => {
787+
const MockConnection = connection.buildMockConnection({
788+
onRequest (params) {
789+
// @ts-expect-error
790+
let [action] = params.body.split('\n')
791+
action = JSON.parse(action)
792+
return { body: { errors: false, items: [action] } }
793+
}
794+
})
795+
796+
const client = new Client({
797+
node: 'http://localhost:9200',
798+
Connection: MockConnection
799+
})
800+
801+
async function * generator () {
802+
const data = dataset.slice()
803+
for (const doc of data) {
804+
yield doc
805+
}
806+
}
807+
808+
let count = 0
809+
await client.helpers.bulk<Document>({
810+
datasource: generator(),
811+
flushBytes: 1,
812+
concurrency: 1,
813+
onDocument (_doc) {
814+
return {
815+
index: { _index: 'test' }
816+
}
817+
},
818+
onSuccess ({ result, document }) {
819+
t.same(result, { index: { _index: 'test' }})
820+
t.same(document, dataset[count++])
821+
},
822+
onDrop (_doc) {
823+
t.fail('This should never be called')
824+
}
825+
})
826+
t.equal(count, 3)
827+
t.end()
828+
})
710829
t.end()
711830
})
712831

@@ -914,6 +1033,8 @@ test('bulk update', t => {
9141033
})
9151034
})
9161035

1036+
1037+
9171038
t.end()
9181039
})
9191040

@@ -1104,6 +1225,63 @@ test('bulk delete', t => {
11041225
server.stop()
11051226
})
11061227

1228+
t.test('should call onSuccess callback with delete action object', async t => {
1229+
const MockConnection = connection.buildMockConnection({
1230+
onRequest (params) {
1231+
// @ts-expect-error
1232+
let [action, payload] = params.body.split('\n')
1233+
action = JSON.parse(action)
1234+
return { body: { errors: false, items: [action] } }
1235+
}
1236+
})
1237+
1238+
const client = new Client({
1239+
node: 'http://localhost:9200',
1240+
Connection: MockConnection
1241+
})
1242+
1243+
let docCount = 0
1244+
let successCount = 0
1245+
await client.helpers.bulk<Document>({
1246+
datasource: dataset.slice(),
1247+
flushBytes: 1,
1248+
concurrency: 1,
1249+
onDocument (_doc) {
1250+
if (docCount++ === 1) {
1251+
return {
1252+
delete: {
1253+
_index: 'test',
1254+
_id: String(docCount)
1255+
}
1256+
}
1257+
} else {
1258+
return {
1259+
index: { _index: 'test' }
1260+
}
1261+
}
1262+
},
1263+
onSuccess ({ result, document }) {
1264+
const item = dataset[successCount]
1265+
if (successCount++ === 1) {
1266+
t.same(result, {
1267+
delete: {
1268+
_index: 'test',
1269+
_id: String(successCount)
1270+
}
1271+
})
1272+
} else {
1273+
t.same(result, { index: { _index: 'test' }})
1274+
t.same(document, item)
1275+
}
1276+
},
1277+
onDrop (_doc) {
1278+
t.fail('This should never be called')
1279+
}
1280+
})
1281+
1282+
t.end()
1283+
})
1284+
11071285
t.end()
11081286
})
11091287

0 commit comments

Comments
 (0)