Skip to content

Apply bulk helper patches from stack client #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 97 additions & 37 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ export interface BulkStats {
aborted: boolean
}

interface IndexAction {
interface IndexActionOperation {
index: T.BulkIndexOperation
}

interface CreateAction {
interface CreateActionOperation {
create: T.BulkCreateOperation
}

Expand All @@ -90,7 +90,9 @@ interface DeleteAction {
delete: T.BulkDeleteOperation
}

type UpdateAction = [UpdateActionOperation, Record<string, any>]
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction

export interface OnDropDocument<TDocument = unknown> {
Expand All @@ -101,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
retried: boolean
}

type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>

export interface OnSuccessDocument<TDocument = unknown> {
result: BulkResponseItem
document?: TDocument
}

interface ZippedResult<TDocument = unknown> {
result: BulkResponseItem
raw: {
action: string
document?: string
}
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document?: () => TDocument
}

export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
Expand All @@ -110,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
onSuccess?: (doc: OnSuccessDocument) => void
}

export interface BulkHelper<T> extends Promise<BulkStats> {
Expand Down Expand Up @@ -379,7 +400,7 @@ export default class Helpers {
clearTimeout(timeoutRef)
}

// In some cases the previos http call does not have finished,
// In some cases the previous http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
const send = await semaphore()
Expand Down Expand Up @@ -415,8 +436,8 @@ export default class Helpers {
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// if we didn't reach the maximum concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running requests has finished.
// The semaphore function resolves a send function, which will be used
// to send the actual msearch request.
// It also returns a finish function, which returns a promise that is resolved
Expand Down Expand Up @@ -548,6 +569,9 @@ export default class Helpers {
retries = this[kMaxRetries],
wait = 5000,
onDrop = noop,
// onSuccess does not default to noop, to avoid the performance hit
// of deserializing every document in the bulk request
onSuccess,
...bulkOptions
} = options

Expand Down Expand Up @@ -620,26 +644,25 @@ export default class Helpers {
let chunkBytes = 0
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line

// @ts-expect-error datasoruce is an iterable
// @ts-expect-error datasource is an iterable
for await (const chunk of datasource) {
if (shouldAbort) break
timeoutRef.refresh()
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
: Object.keys(action)[0]
const result = onDocument(chunk)
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
const operation = Object.keys(action)[0]
if (operation === 'index' || operation === 'create') {
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
payloadBody = typeof payload === 'string'
? payload
: serializer.serialize(payload)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
// @ts-expect-error in case of update action is an array
actionBody = serializer.serialize(action[0])
actionBody = serializer.serialize(action)
payloadBody = typeof chunk === 'string'
? `{"doc":${chunk}}`
// @ts-expect-error in case of update action is an array
: serializer.serialize({ doc: chunk, ...action[1] })
: serializer.serialize({ doc: chunk, ...payload })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
Expand All @@ -653,15 +676,16 @@ export default class Helpers {

if (chunkBytes >= flushBytes) {
stats.bytes += chunkBytes
const send = await semaphore()
send(bulkBody.slice())
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
const send = await semaphore()
send(bulkBodyCopy)
}
}

clearTimeout(timeoutRef)
// In some cases the previos http call does not have finished,
// In some cases the previous http call has not finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (!shouldAbort && chunkBytes > 0) {
const send = await semaphore()
Expand Down Expand Up @@ -697,8 +721,8 @@ export default class Helpers {
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// if we didn't reach the maximum concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running requests has finished.
// The semaphore function resolves a send function, which will be used
// to send the actual bulk request.
// It also returns a finish function, which returns a promise that is resolved
Expand Down Expand Up @@ -805,57 +829,93 @@ export default class Helpers {
callback()
}

/**
* Zips bulk response items (the action's result) with the original document body.
* The raw string version of action and document lines are also included.
*/
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
const zipped = []
let indexSlice = 0
for (let i = 0, len = responseItems.length; i < len; i++) {
const result = responseItems[i]
const operation = Object.keys(result)[0]
let zipResult

if (operation === 'delete') {
zipResult = {
result,
raw: { action: bulkBody[indexSlice] }
}
indexSlice += 1
} else {
const document = bulkBody[indexSlice + 1]
zipResult = {
result,
raw: { action: bulkBody[indexSlice], document },
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document: () => serializer.deserialize(document)
}
indexSlice += 2
}

zipped.push(zipResult as ZippedResult)
}

return zipped
}

function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
if (shouldAbort) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(response => {
const result = response.body
const results = zipBulkResults(result.items, bulkBody)

if (!result.errors) {
stats.successful += result.items.length
for (const item of result.items) {
if (item.update?.result === 'noop') {
for (const item of results) {
const { result, document = noop } = item
if (result.update?.result === 'noop') {
stats.noop++
}
if (onSuccess != null) onSuccess({ result, document: document() })
}
return callback(null, [])
}
const retry = []
const { items } = result
let indexSlice = 0
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
for (const item of results) {
const { result, raw, document = noop } = item
const operation = Object.keys(result)[0]
// @ts-expect-error
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')

if (responseItem.status >= 400) {
// 429 is the only status code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
// but the ES node was handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice])
retry.push(raw.action)
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
retry.push(raw.document ?? '')
}
} else {
onDrop({
status: responseItem.status,
error: responseItem.error ?? null,
operation: serializer.deserialize(bulkBody[indexSlice]),
operation: serializer.deserialize(raw.action),
// @ts-expect-error
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
document: document(),
retried: isRetrying
})
stats.failed += 1
}
} else {
stats.successful += 1
if (onSuccess != null) onSuccess({ result, document: document() })
}
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
}
callback(null, retry)
})
Expand Down
Loading