Skip to content

[helpers] add support for transport options to all helpers #1400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/Helpers.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export default class Helpers {
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
msearch(options?: MsearchHelperOptions): MsearchHelper
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>, reqOptions?: TransportRequestOptions): BulkHelper<BulkStats>
}

export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> {
Expand Down
17 changes: 11 additions & 6 deletions lib/Helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class Helpers {
*/
async * scrollDocuments (params, options) {
appendFilterPath('hits.hits._source', params, true)
for await (const { documents } of this.scrollSearch(params)) {
for await (const { documents } of this.scrollSearch(params, options)) {
for (const document of documents) {
yield document
}
Expand All @@ -169,9 +169,10 @@ class Helpers {
* Creates a msearch helper instance. Once you configure it, you can use the provided
* `search` method to add new searches in the queue.
* @param {object} options - The configuration of the msearch operations.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run.
*/
msearch (options = {}) {
msearch (options = {}, reqOptions = {}) {
const client = this[kClient]
const {
operations = 5,
Expand Down Expand Up @@ -378,7 +379,7 @@ class Helpers {
// This function never returns an error, if the msearch operation fails,
// the error is dispatched to all search executors.
function tryMsearch (msearchBody, callbacks, done) {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions, (err, results) => {
const retryBody = []
const retryCallbacks = []
if (err) {
Expand Down Expand Up @@ -415,12 +416,16 @@ class Helpers {
* Creates a bulk helper instance. Once you configure it, you can pick which operation
* to execute with the given dataset, index, create, update, and delete.
* @param {object} options - The configuration of the bulk operation.
* @param {object} reqOptions - The client optional configuration for this request.
* @return {object} The possible operations to run with the datasource.
*/
bulk (options) {
bulk (options, reqOptions = {}) {
const client = this[kClient]
const { serialize, deserialize } = client.serializer
const reqOptions = this[kMetaHeader] !== null ? { headers: { 'x-elastic-client-meta': this[kMetaHeader] + ',h=bp' } } : {}
if (this[kMetaHeader] !== null) {
reqOptions.headers = reqOptions.headers || {}
reqOptions.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=bp'
}
const {
datasource,
onDocument,
Expand Down Expand Up @@ -545,7 +550,7 @@ class Helpers {
index: typeof refreshOnCompletion === 'string'
? refreshOnCompletion
: '_all'
})
}, reqOptions)
}

stats.time = Date.now() - startTime
Expand Down
60 changes: 60 additions & 0 deletions test/unit/helpers/bulk.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,66 @@ test('bulk delete', t => {
t.end()
})

test('transport options', t => {
t.test('Should pass transport options in request', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
count++

if (params.path === '/_bulk') {
t.match(params.headers, {
'content-type': 'application/x-ndjson',
foo: 'bar'
})
return { body: { errors: false, items: [{}] } }
}

t.strictEqual(params.path, '/_all/_refresh')
t.match(params.headers, {
foo: 'bar'
})
return { body: {} }
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

const result = await client.helpers.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
return { index: { _index: 'test' } }
},
onDrop (doc) {
t.fail('This should never be called')
},
refreshOnCompletion: true
}, {
headers: {
foo: 'bar'
}
})

t.strictEqual(count, 4) // three bulk requests, one refresh
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})

t.end()
})

test('errors', t => {
t.test('datasource type', async t => {
const client = new Client({
Expand Down
39 changes: 39 additions & 0 deletions test/unit/helpers/msearch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,42 @@ test('Stop should resolve the helper (error)', t => {

m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
})

test('Should use req options', async t => {
t.plan(1)

const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.match(params.headers, {
foo: 'bar'
})

return {
body: {
responses: [{
status: 200,
hits: { hits: [] }
}]
}
}
}
})

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

const m = client.helpers.msearch({ operations: 1 }, {
headers: {
foo: 'bar'
}
})

await m.search(
{ index: 'test' },
{ query: { match: { foo: 'bar' } } }
)

t.teardown(() => m.stop())
})