Skip to content

Commit 973a981

Browse files
Spencerspalger
andauthored
[helpers] add support for transport options to all helpers (#1400)
Co-authored-by: spalger <[email protected]>
1 parent 2b084db commit 973a981

File tree

4 files changed

+112
-8
lines changed

4 files changed

+112
-8
lines changed

lib/Helpers.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ export default class Helpers {
2525
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
2626
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
2727
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
28-
msearch(options?: MsearchHelperOptions): MsearchHelper
29-
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
28+
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper
29+
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>, reqOptions?: TransportRequestOptions): BulkHelper<BulkStats>
3030
}
3131

3232
export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> {

lib/Helpers.js

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class Helpers {
158158
*/
159159
async * scrollDocuments (params, options) {
160160
appendFilterPath('hits.hits._source', params, true)
161-
for await (const { documents } of this.scrollSearch(params)) {
161+
for await (const { documents } of this.scrollSearch(params, options)) {
162162
for (const document of documents) {
163163
yield document
164164
}
@@ -169,9 +169,10 @@ class Helpers {
169169
* Creates a msearch helper instance. Once you configure it, you can use the provided
170170
* `search` method to add new searches in the queue.
171171
* @param {object} options - The configuration of the msearch operations.
172+
* @param {object} reqOptions - The client optional configuration for this request.
172173
* @return {object} The possible operations to run.
173174
*/
174-
msearch (options = {}) {
175+
msearch (options = {}, reqOptions = {}) {
175176
const client = this[kClient]
176177
const {
177178
operations = 5,
@@ -378,7 +379,7 @@ class Helpers {
378379
// This function never returns an error, if the msearch operation fails,
379380
// the error is dispatched to all search executors.
380381
function tryMsearch (msearchBody, callbacks, done) {
381-
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
382+
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions, (err, results) => {
382383
const retryBody = []
383384
const retryCallbacks = []
384385
if (err) {
@@ -415,12 +416,16 @@ class Helpers {
415416
* Creates a bulk helper instance. Once you configure it, you can pick which operation
416417
* to execute with the given dataset, index, create, update, and delete.
417418
* @param {object} options - The configuration of the bulk operation.
419+
* @param {object} reqOptions - The client optional configuration for this request.
418420
* @return {object} The possible operations to run with the datasource.
419421
*/
420-
bulk (options) {
422+
bulk (options, reqOptions = {}) {
421423
const client = this[kClient]
422424
const { serialize, deserialize } = client.serializer
423-
const reqOptions = this[kMetaHeader] !== null ? { headers: { 'x-elastic-client-meta': this[kMetaHeader] + ',h=bp' } } : {}
425+
if (this[kMetaHeader] !== null) {
426+
reqOptions.headers = reqOptions.headers || {}
427+
reqOptions.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=bp'
428+
}
424429
const {
425430
datasource,
426431
onDocument,
@@ -545,7 +550,7 @@ class Helpers {
545550
index: typeof refreshOnCompletion === 'string'
546551
? refreshOnCompletion
547552
: '_all'
548-
})
553+
}, reqOptions)
549554
}
550555

551556
stats.time = Date.now() - startTime

test/unit/helpers/bulk.test.js

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,66 @@ test('bulk delete', t => {
10491049
t.end()
10501050
})
10511051

1052+
test('transport options', t => {
1053+
t.test('Should pass transport options in request', async t => {
1054+
let count = 0
1055+
const MockConnection = connection.buildMockConnection({
1056+
onRequest (params) {
1057+
count++
1058+
1059+
if (params.path === '/_bulk') {
1060+
t.match(params.headers, {
1061+
'content-type': 'application/x-ndjson',
1062+
foo: 'bar'
1063+
})
1064+
return { body: { errors: false, items: [{}] } }
1065+
}
1066+
1067+
t.strictEqual(params.path, '/_all/_refresh')
1068+
t.match(params.headers, {
1069+
foo: 'bar'
1070+
})
1071+
return { body: {} }
1072+
}
1073+
})
1074+
1075+
const client = new Client({
1076+
node: 'http://localhost:9200',
1077+
Connection: MockConnection
1078+
})
1079+
1080+
const result = await client.helpers.bulk({
1081+
datasource: dataset.slice(),
1082+
flushBytes: 1,
1083+
concurrency: 1,
1084+
onDocument (doc) {
1085+
return { index: { _index: 'test' } }
1086+
},
1087+
onDrop (doc) {
1088+
t.fail('This should never be called')
1089+
},
1090+
refreshOnCompletion: true
1091+
}, {
1092+
headers: {
1093+
foo: 'bar'
1094+
}
1095+
})
1096+
1097+
t.strictEqual(count, 4) // three bulk requests, one refresh
1098+
t.type(result.time, 'number')
1099+
t.type(result.bytes, 'number')
1100+
t.match(result, {
1101+
total: 3,
1102+
successful: 3,
1103+
retry: 0,
1104+
failed: 0,
1105+
aborted: false
1106+
})
1107+
})
1108+
1109+
t.end()
1110+
})
1111+
10521112
test('errors', t => {
10531113
t.test('datasource type', async t => {
10541114
const client = new Client({

test/unit/helpers/msearch.test.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,3 +756,42 @@ test('Stop should resolve the helper (error)', t => {
756756

757757
m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
758758
})
759+
760+
test('Should use req options', async t => {
761+
t.plan(1)
762+
763+
const MockConnection = connection.buildMockConnection({
764+
onRequest (params) {
765+
t.match(params.headers, {
766+
foo: 'bar'
767+
})
768+
769+
return {
770+
body: {
771+
responses: [{
772+
status: 200,
773+
hits: { hits: [] }
774+
}]
775+
}
776+
}
777+
}
778+
})
779+
780+
const client = new Client({
781+
node: 'http://localhost:9200',
782+
Connection: MockConnection
783+
})
784+
785+
const m = client.helpers.msearch({ operations: 1 }, {
786+
headers: {
787+
foo: 'bar'
788+
}
789+
})
790+
791+
await m.search(
792+
{ index: 'test' },
793+
{ query: { match: { foo: 'bar' } } }
794+
)
795+
796+
t.teardown(() => m.stop())
797+
})

0 commit comments

Comments
 (0)