Skip to content

Commit 1607a0d

Browse files
JoshMockpquentin
andauthored
Fix hang in bulk helper semaphore when server responses are slower than flushInterval (#2027)
* Set version to 8.10.1 * Add tests for bulk helper with various flush and server timeouts * Copy and empty bulkBody when flushBytes is reached Before it was waiting until after semaphore resolved, then sending with a reference to bulkBody. If flushInterval is reached after `await semaphore()` but before `send(bulkBody)`, onFlushTimeout is "stealing" bulkBody so that there is nothing left in bulkBody for the flushBytes block to send, causing an indefinite hang for a promise that does not resolve. * comment typo fixes --------- Co-authored-by: Quentin Pradet <[email protected]>
1 parent 57ee5cf commit 1607a0d

File tree

2 files changed

+156
-6
lines changed

2 files changed

+156
-6
lines changed

src/helpers.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ export default class Helpers {
624624
let chunkBytes = 0
625625
timeoutRef = setTimeout(onFlushTimeout, flushInterval) // eslint-disable-line
626626

627-
// @ts-expect-error datasoruce is an iterable
627+
// @ts-expect-error datasource is an iterable
628628
for await (const chunk of datasource) {
629629
if (shouldAbort) break
630630
timeoutRef.refresh()
@@ -656,15 +656,16 @@ export default class Helpers {
656656

657657
if (chunkBytes >= flushBytes) {
658658
stats.bytes += chunkBytes
659-
const send = await semaphore()
660-
send(bulkBody.slice())
659+
const bulkBodyCopy = bulkBody.slice()
661660
bulkBody.length = 0
662661
chunkBytes = 0
662+
const send = await semaphore()
663+
send(bulkBodyCopy)
663664
}
664665
}
665666

666667
clearTimeout(timeoutRef)
667-
// In some cases the previos http call does not have finished,
668+
// In some cases the previous http call has not finished,
668669
// or we didn't reach the flush bytes threshold, so we force one last operation.
669670
if (!shouldAbort && chunkBytes > 0) {
670671
const send = await semaphore()
@@ -708,8 +709,8 @@ export default class Helpers {
708709
// to guarantee that no more than the number of operations
709710
// allowed to run at the same time are executed.
710711
// It returns a semaphore function which resolves in the next tick
711-
// if we didn't reach the maximim concurrency yet, otherwise it returns
712-
// a promise that resolves as soon as one of the running request has finshed.
712+
// if we didn't reach the maximum concurrency yet, otherwise it returns
713+
// a promise that resolves as soon as one of the running requests has finished.
713714
// The semaphore function resolves a send function, which will be used
714715
// to send the actual bulk request.
715716
// It also returns a finish function, which returns a promise that is resolved

test/unit/helpers/bulk.test.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import { createReadStream } from 'fs'
2323
import * as http from 'http'
2424
import { join } from 'path'
2525
import split from 'split2'
26+
import { Readable } from 'stream'
2627
import { test } from 'tap'
2728
import { Client, errors } from '../../../'
2829
import { buildServer, connection } from '../../utils'
30+
const { sleep } = require('../../integration/helper')
2931

3032
let clientVersion: string = require('../../../package.json').version // eslint-disable-line
3133
if (clientVersion.includes('-')) {
@@ -1594,3 +1596,150 @@ test('Flush interval', t => {
15941596

15951597
t.end()
15961598
})
1599+
1600+
test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => {
1601+
const flushInterval = 500
1602+
1603+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1604+
setTimeout(() => {
1605+
res.writeHead(200, { 'content-type': 'application/json' })
1606+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1607+
}, 1000)
1608+
}
1609+
1610+
const [{ port }, server] = await buildServer(handler)
1611+
const client = new Client({ node: `http://localhost:${port}` })
1612+
1613+
async function * generator () {
1614+
const data = dataset.slice()
1615+
for (const doc of data) {
1616+
await sleep(flushInterval)
1617+
yield doc
1618+
}
1619+
}
1620+
1621+
const result = await client.helpers.bulk({
1622+
datasource: Readable.from(generator()),
1623+
flushBytes: 1,
1624+
flushInterval: flushInterval,
1625+
concurrency: 1,
1626+
onDocument (_) {
1627+
return {
1628+
index: { _index: 'test' }
1629+
}
1630+
},
1631+
onDrop (_) {
1632+
t.fail('This should never be called')
1633+
}
1634+
})
1635+
1636+
t.type(result.time, 'number')
1637+
t.type(result.bytes, 'number')
1638+
t.match(result, {
1639+
total: 3,
1640+
successful: 3,
1641+
retry: 0,
1642+
failed: 0,
1643+
aborted: false
1644+
})
1645+
1646+
server.stop()
1647+
})
1648+
1649+
test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => {
1650+
const flushInterval = 500
1651+
1652+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1653+
setTimeout(() => {
1654+
res.writeHead(200, { 'content-type': 'application/json' })
1655+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1656+
}, 250)
1657+
}
1658+
1659+
const [{ port }, server] = await buildServer(handler)
1660+
const client = new Client({ node: `http://localhost:${port}` })
1661+
1662+
async function * generator () {
1663+
const data = dataset.slice()
1664+
for (const doc of data) {
1665+
await sleep(flushInterval)
1666+
yield doc
1667+
}
1668+
}
1669+
1670+
const result = await client.helpers.bulk({
1671+
datasource: Readable.from(generator()),
1672+
flushBytes: 1,
1673+
flushInterval: flushInterval,
1674+
concurrency: 1,
1675+
onDocument (_) {
1676+
return {
1677+
index: { _index: 'test' }
1678+
}
1679+
},
1680+
onDrop (_) {
1681+
t.fail('This should never be called')
1682+
}
1683+
})
1684+
1685+
t.type(result.time, 'number')
1686+
t.type(result.bytes, 'number')
1687+
t.match(result, {
1688+
total: 3,
1689+
successful: 3,
1690+
retry: 0,
1691+
failed: 0,
1692+
aborted: false
1693+
})
1694+
1695+
server.stop()
1696+
})
1697+
1698+
test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => {
1699+
const flushInterval = 500
1700+
1701+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1702+
setTimeout(() => {
1703+
res.writeHead(200, { 'content-type': 'application/json' })
1704+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1705+
}, flushInterval)
1706+
}
1707+
1708+
const [{ port }, server] = await buildServer(handler)
1709+
const client = new Client({ node: `http://localhost:${port}` })
1710+
1711+
async function * generator () {
1712+
const data = dataset.slice()
1713+
for (const doc of data) {
1714+
await sleep(flushInterval)
1715+
yield doc
1716+
}
1717+
}
1718+
1719+
const result = await client.helpers.bulk({
1720+
datasource: Readable.from(generator()),
1721+
flushBytes: 1,
1722+
flushInterval: flushInterval,
1723+
concurrency: 1,
1724+
onDocument (_) {
1725+
return {
1726+
index: { _index: 'test' }
1727+
}
1728+
},
1729+
onDrop (_) {
1730+
t.fail('This should never be called')
1731+
}
1732+
})
1733+
1734+
t.type(result.time, 'number')
1735+
t.type(result.bytes, 'number')
1736+
t.match(result, {
1737+
total: 3,
1738+
successful: 3,
1739+
retry: 0,
1740+
failed: 0,
1741+
aborted: false
1742+
})
1743+
1744+
server.stop()
1745+
})

0 commit comments

Comments
 (0)