Skip to content

Commit 291b3af

Browse files
committed
Add tests for bulk helper with various flush and server timeouts
1 parent a50385d commit 291b3af

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed

test/unit/helpers/bulk.test.ts

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ import { createReadStream } from 'fs'
2222
import * as http from 'http'
2323
import { join } from 'path'
2424
import split from 'split2'
25+
import { Readable } from 'stream'
2526
import { test } from 'tap'
2627
import { Client, errors } from '../../../'
2728
import { buildServer, connection } from '../../utils'
29+
const { sleep } = require('../../integration/helper')
2830

2931
let clientVersion: string = require('../../../package.json').version // eslint-disable-line
3032
if (clientVersion.includes('-')) {
@@ -1562,3 +1564,150 @@ test('Flush interval', t => {
15621564

15631565
t.end()
15641566
})
1567+
1568+
test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => {
1569+
const flushInterval = 500
1570+
1571+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1572+
setTimeout(() => {
1573+
res.writeHead(200, { 'content-type': 'application/json' })
1574+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1575+
}, 1000)
1576+
}
1577+
1578+
const [{ port }, server] = await buildServer(handler)
1579+
const client = new Client({ node: `http://localhost:${port}` })
1580+
1581+
async function * generator () {
1582+
const data = dataset.slice()
1583+
for (const doc of data) {
1584+
await sleep(flushInterval)
1585+
yield doc
1586+
}
1587+
}
1588+
1589+
const result = await client.helpers.bulk({
1590+
datasource: Readable.from(generator()),
1591+
flushBytes: 1,
1592+
flushInterval: flushInterval,
1593+
concurrency: 1,
1594+
onDocument (_) {
1595+
return {
1596+
index: { _index: 'test' }
1597+
}
1598+
},
1599+
onDrop (_) {
1600+
t.fail('This should never be called')
1601+
}
1602+
})
1603+
1604+
t.type(result.time, 'number')
1605+
t.type(result.bytes, 'number')
1606+
t.match(result, {
1607+
total: 3,
1608+
successful: 3,
1609+
retry: 0,
1610+
failed: 0,
1611+
aborted: false
1612+
})
1613+
1614+
server.stop()
1615+
})
1616+
1617+
test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => {
1618+
const flushInterval = 500
1619+
1620+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1621+
setTimeout(() => {
1622+
res.writeHead(200, { 'content-type': 'application/json' })
1623+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1624+
}, 250)
1625+
}
1626+
1627+
const [{ port }, server] = await buildServer(handler)
1628+
const client = new Client({ node: `http://localhost:${port}` })
1629+
1630+
async function * generator () {
1631+
const data = dataset.slice()
1632+
for (const doc of data) {
1633+
await sleep(flushInterval)
1634+
yield doc
1635+
}
1636+
}
1637+
1638+
const result = await client.helpers.bulk({
1639+
datasource: Readable.from(generator()),
1640+
flushBytes: 1,
1641+
flushInterval: flushInterval,
1642+
concurrency: 1,
1643+
onDocument (_) {
1644+
return {
1645+
index: { _index: 'test' }
1646+
}
1647+
},
1648+
onDrop (_) {
1649+
t.fail('This should never be called')
1650+
}
1651+
})
1652+
1653+
t.type(result.time, 'number')
1654+
t.type(result.bytes, 'number')
1655+
t.match(result, {
1656+
total: 3,
1657+
successful: 3,
1658+
retry: 0,
1659+
failed: 0,
1660+
aborted: false
1661+
})
1662+
1663+
server.stop()
1664+
})
1665+
1666+
test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => {
1667+
const flushInterval = 500
1668+
1669+
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
1670+
setTimeout(() => {
1671+
res.writeHead(200, { 'content-type': 'application/json' })
1672+
res.end(JSON.stringify({ errors: false, items: [{}] }))
1673+
}, flushInterval)
1674+
}
1675+
1676+
const [{ port }, server] = await buildServer(handler)
1677+
const client = new Client({ node: `http://localhost:${port}` })
1678+
1679+
async function * generator () {
1680+
const data = dataset.slice()
1681+
for (const doc of data) {
1682+
await sleep(flushInterval)
1683+
yield doc
1684+
}
1685+
}
1686+
1687+
const result = await client.helpers.bulk({
1688+
datasource: Readable.from(generator()),
1689+
flushBytes: 1,
1690+
flushInterval: flushInterval,
1691+
concurrency: 1,
1692+
onDocument (_) {
1693+
return {
1694+
index: { _index: 'test' }
1695+
}
1696+
},
1697+
onDrop (_) {
1698+
t.fail('This should never be called')
1699+
}
1700+
})
1701+
1702+
t.type(result.time, 'number')
1703+
t.type(result.bytes, 'number')
1704+
t.match(result, {
1705+
total: 3,
1706+
successful: 3,
1707+
retry: 0,
1708+
failed: 0,
1709+
aborted: false
1710+
})
1711+
1712+
server.stop()
1713+
})

0 commit comments

Comments
 (0)