@@ -23,9 +23,11 @@ import { createReadStream } from 'fs'
23
23
import * as http from 'http'
24
24
import { join } from 'path'
25
25
import split from 'split2'
26
+ import { Readable } from 'stream'
26
27
import { test } from 'tap'
27
28
import { Client , errors } from '../../../'
28
29
import { buildServer , connection } from '../../utils'
30
+ const { sleep } = require ( '../../integration/helper' )
29
31
30
32
let clientVersion : string = require ( '../../../package.json' ) . version // eslint-disable-line
31
33
if ( clientVersion . includes ( '-' ) ) {
@@ -1594,3 +1596,150 @@ test('Flush interval', t => {
1594
1596
1595
1597
t . end ( )
1596
1598
} )
1599
+
1600
+ test ( `flush timeout does not lock process when flushInterval is less than server timeout` , async t => {
1601
+ const flushInterval = 500
1602
+
1603
+ async function handler ( req : http . IncomingMessage , res : http . ServerResponse ) {
1604
+ setTimeout ( ( ) => {
1605
+ res . writeHead ( 200 , { 'content-type' : 'application/json' } )
1606
+ res . end ( JSON . stringify ( { errors : false , items : [ { } ] } ) )
1607
+ } , 1000 )
1608
+ }
1609
+
1610
+ const [ { port } , server ] = await buildServer ( handler )
1611
+ const client = new Client ( { node : `http://localhost:${ port } ` } )
1612
+
1613
+ async function * generator ( ) {
1614
+ const data = dataset . slice ( )
1615
+ for ( const doc of data ) {
1616
+ await sleep ( flushInterval )
1617
+ yield doc
1618
+ }
1619
+ }
1620
+
1621
+ const result = await client . helpers . bulk ( {
1622
+ datasource : Readable . from ( generator ( ) ) ,
1623
+ flushBytes : 1 ,
1624
+ flushInterval : flushInterval ,
1625
+ concurrency : 1 ,
1626
+ onDocument ( _ ) {
1627
+ return {
1628
+ index : { _index : 'test' }
1629
+ }
1630
+ } ,
1631
+ onDrop ( _ ) {
1632
+ t . fail ( 'This should never be called' )
1633
+ }
1634
+ } )
1635
+
1636
+ t . type ( result . time , 'number' )
1637
+ t . type ( result . bytes , 'number' )
1638
+ t . match ( result , {
1639
+ total : 3 ,
1640
+ successful : 3 ,
1641
+ retry : 0 ,
1642
+ failed : 0 ,
1643
+ aborted : false
1644
+ } )
1645
+
1646
+ server . stop ( )
1647
+ } )
1648
+
1649
+ test ( `flush timeout does not lock process when flushInterval is greater than server timeout` , async t => {
1650
+ const flushInterval = 500
1651
+
1652
+ async function handler ( req : http . IncomingMessage , res : http . ServerResponse ) {
1653
+ setTimeout ( ( ) => {
1654
+ res . writeHead ( 200 , { 'content-type' : 'application/json' } )
1655
+ res . end ( JSON . stringify ( { errors : false , items : [ { } ] } ) )
1656
+ } , 250 )
1657
+ }
1658
+
1659
+ const [ { port } , server ] = await buildServer ( handler )
1660
+ const client = new Client ( { node : `http://localhost:${ port } ` } )
1661
+
1662
+ async function * generator ( ) {
1663
+ const data = dataset . slice ( )
1664
+ for ( const doc of data ) {
1665
+ await sleep ( flushInterval )
1666
+ yield doc
1667
+ }
1668
+ }
1669
+
1670
+ const result = await client . helpers . bulk ( {
1671
+ datasource : Readable . from ( generator ( ) ) ,
1672
+ flushBytes : 1 ,
1673
+ flushInterval : flushInterval ,
1674
+ concurrency : 1 ,
1675
+ onDocument ( _ ) {
1676
+ return {
1677
+ index : { _index : 'test' }
1678
+ }
1679
+ } ,
1680
+ onDrop ( _ ) {
1681
+ t . fail ( 'This should never be called' )
1682
+ }
1683
+ } )
1684
+
1685
+ t . type ( result . time , 'number' )
1686
+ t . type ( result . bytes , 'number' )
1687
+ t . match ( result , {
1688
+ total : 3 ,
1689
+ successful : 3 ,
1690
+ retry : 0 ,
1691
+ failed : 0 ,
1692
+ aborted : false
1693
+ } )
1694
+
1695
+ server . stop ( )
1696
+ } )
1697
+
1698
+ test ( `flush timeout does not lock process when flushInterval is equal to server timeout` , async t => {
1699
+ const flushInterval = 500
1700
+
1701
+ async function handler ( req : http . IncomingMessage , res : http . ServerResponse ) {
1702
+ setTimeout ( ( ) => {
1703
+ res . writeHead ( 200 , { 'content-type' : 'application/json' } )
1704
+ res . end ( JSON . stringify ( { errors : false , items : [ { } ] } ) )
1705
+ } , flushInterval )
1706
+ }
1707
+
1708
+ const [ { port } , server ] = await buildServer ( handler )
1709
+ const client = new Client ( { node : `http://localhost:${ port } ` } )
1710
+
1711
+ async function * generator ( ) {
1712
+ const data = dataset . slice ( )
1713
+ for ( const doc of data ) {
1714
+ await sleep ( flushInterval )
1715
+ yield doc
1716
+ }
1717
+ }
1718
+
1719
+ const result = await client . helpers . bulk ( {
1720
+ datasource : Readable . from ( generator ( ) ) ,
1721
+ flushBytes : 1 ,
1722
+ flushInterval : flushInterval ,
1723
+ concurrency : 1 ,
1724
+ onDocument ( _ ) {
1725
+ return {
1726
+ index : { _index : 'test' }
1727
+ }
1728
+ } ,
1729
+ onDrop ( _ ) {
1730
+ t . fail ( 'This should never be called' )
1731
+ }
1732
+ } )
1733
+
1734
+ t . type ( result . time , 'number' )
1735
+ t . type ( result . bytes , 'number' )
1736
+ t . match ( result , {
1737
+ total : 3 ,
1738
+ successful : 3 ,
1739
+ retry : 0 ,
1740
+ failed : 0 ,
1741
+ aborted : false
1742
+ } )
1743
+
1744
+ server . stop ( )
1745
+ } )
0 commit comments