Skip to content

Commit 9751959

Browse files
committed
New auto_flush flag logic, no tests yet
1 parent ff00e32 commit 9751959

File tree

4 files changed

+111
-33
lines changed

4 files changed

+111
-33
lines changed

examples/random_data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from questdb.ingress import Sender, TimestampNanos
1+
from questdb.ingress import Sender, TimestampNanos, AutoFlush
22
import random
33
import uuid
44
import time
55

66

77
def example(host: str = 'localhost', port: int = 9009):
88
table_name: str = str(uuid.uuid1())
9-
watermark = 1024 # Flush if the internal buffer exceeds 1KiB
9+
watermark = AutoFlush.ByteCount(1024) # Flush if the internal buffer exceeds 1KiB
1010
with Sender(host=host, port=port, auto_flush=watermark) as sender:
1111
total_rows = 0
1212
last_flush = time.monotonic()

src/questdb/dataframe.pxi

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
cdef struct auto_flush_t:
44
line_sender* sender
5+
auto_flush_mode_t mode
56
size_t watermark
67

78

89
cdef auto_flush_t auto_flush_blank() noexcept nogil:
910
cdef auto_flush_t af
1011
af.sender = NULL
12+
af.mode = auto_flush_disabled
1113
af.watermark = 0
1214
return af
1315

@@ -2094,8 +2096,14 @@ cdef void_int _dataframe_handle_auto_flush(
20942096
cdef line_sender_error* marker_err
20952097
cdef bint flush_ok
20962098
cdef bint marker_ok
2097-
if (af.sender == NULL) or (line_sender_buffer_size(ls_buf) < af.watermark):
2099+
if (af.sender == NULL) or (af.mode == auto_flush_disabled):
20982100
return 0
2101+
elif af.mode == auto_flush_row_count:
2102+
if line_sender_buffer_row_count(ls_buf) < af.watermark:
2103+
return 0
2104+
elif af.mode == auto_flush_byte_count:
2105+
if line_sender_buffer_size(ls_buf) < af.watermark:
2106+
return 0
20992107

21002108
# Always temporarily release GIL during a flush.
21012109
had_gil = _ensure_doesnt_have_gil(gs)

src/questdb/ingress.pyx

Lines changed: 95 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,11 @@ cdef class Buffer
496496

497497

498498
cdef void_int may_flush_on_row_complete(Buffer buffer, Sender sender) except -1:
499-
if sender._auto_flush_enabled:
499+
cdef bint flush = False
500+
if sender._auto_flush_mode == auto_flush_row_count:
501+
if line_sender_buffer_row_count(buffer._impl) >= sender._auto_flush_watermark:
502+
sender.flush(buffer)
503+
elif sender._auto_flush_mode == auto_flush_byte_count:
500504
if len(buffer) >= sender._auto_flush_watermark:
501505
sender.flush(buffer)
502506

@@ -1208,6 +1212,46 @@ _FLUSH_FMT = ('{} - See https://py-questdb-client.readthedocs.io/en/'
12081212
'/troubleshooting.html#inspecting-and-debugging-errors#flush-failed')
12091213

12101214

1215+
class AutoFlush:
1216+
"""
1217+
Auto flush mode.
1218+
1219+
Use one of ``AutoFlush.Disabled``, ``AutoFlush.RowCount`` or
1220+
``AutoFlush.ByteCount``.
1221+
"""
1222+
1223+
class Disabled:
1224+
"""
1225+
Auto flush disabled.
1226+
Call ``flush()`` manually.
1227+
"""
1228+
pass
1229+
1230+
class RowCount:
1231+
"""
1232+
Auto flush after a number of rows.
1233+
"""
1234+
def __init__(self, value: int):
1235+
if value < 0:
1236+
raise ValueError('value must be a positive integer.')
1237+
self.value = value
1238+
1239+
class ByteCount:
1240+
"""
1241+
Auto flush after a number of bytes.
1242+
"""
1243+
def __init__(self, value: int):
1244+
if value < 0:
1245+
raise ValueError('value must be a positive integer.')
1246+
self.value = value
1247+
1248+
1249+
cdef enum auto_flush_mode_t:
1250+
auto_flush_disabled,
1251+
auto_flush_row_count,
1252+
auto_flush_byte_count,
1253+
1254+
12111255
cdef class Sender:
12121256
"""
12131257
A sender is a client that inserts rows into QuestDB via the ILP protocol.
@@ -1265,19 +1309,25 @@ cdef class Sender:
12651309
You can control this behavior by setting the ``auto_flush`` argument.
12661310
12671311
.. code-block:: python
1312+
from questdb.ingress import AutoFlush
12681313
1269-
# Never flushes automatically.
1270-
sender = Sender('localhost', 9009, auto_flush=False)
1271-
sender = Sender('localhost', 9009, auto_flush=None) # Ditto.
1272-
sender = Sender('localhost', 9009, auto_flush=0) # Ditto.
1314+
# Flushes automatically after 600 rows (the default).
1315+
sender = Sender('localhost', 9009)
1316+
sender = Sender('localhost', 9009, auto_flush=AutoFlush.RowCount(600)) # Ditto.
1317+
1318+
# Flushes automatically after each row.
1319+
sender = Sender('localhost', 9009, auto_flush=AutoFlush.RowCount(1))
12731320
12741321
# Flushes automatically when the buffer reaches 1KiB.
1275-
sender = Sender('localhost', 9009, auto_flush=1024)
1322+
sender = Sender('localhost', 9009, auto_flush=AutoFlush.ByteCount(1024))
12761323
1277-
# Flushes automatically after every row.
1278-
sender = Sender('localhost', 9009, auto_flush=True)
1279-
sender = Sender('localhost', 9009, auto_flush=1) # Ditto.
1324+
# Disabled: Never flushes automatically.
1325+
sender = Sender('localhost', 9009, auto_flush=AutoFlush.Disabled)
12801326
1327+
When auto-flushing is disabled, you must call ``sender.flush()`` manually.
1328+
Note that when exiting a ``with sender:`` block, the sender will
1329+
automatically flush the buffer. This is *not* disabled by setting
1330+
``auto_flush=AutoFlush.Disabled``.
12811331
12821332
**Authentication and TLS Encryption**
12831333
@@ -1348,9 +1398,9 @@ cdef class Sender:
13481398
* ``max_name_length`` (``int``): Maximum length of a table or column name.
13491399
*See Buffer's constructor for more details.*
13501400
1351-
* ``auto_flush`` (``bool`` or ``int``): Whether to automatically flush the
1352-
buffer when it reaches a certain byte-size watermark.
1353-
*Default: 64512 (63KiB).*
1401+
* ``auto_flush``: Whether to automatically flush the
1402+
buffer when it reaches a certain row-count or byte-count watermark.
1403+
*Default: 600 rows.*
13541404
*See above for details.*
13551405
13561406
**HTTP-only keyword-only constructor arguments for the Sender(..)**
@@ -1359,9 +1409,9 @@ cdef class Sender:
13591409
the rows in the batch are for the same table.
13601410
Setting ``transactional=True`` will prevent flushing batches of rows
13611411
with mixed table names.
1362-
To fully control transactions, you also need to set ``auto_flush=False``
1363-
or buffered lines may be flushed automatically and thus split across
1364-
multiple transactions.
1412+
To fully control transactions, you also need to set
1413+
``auto_flush=AutoFlush.Disabled`` or buffered lines may be flushed
1414+
automatically and thus split across multiple transactions.
13651415
If ``transactional=False`` (default), the client will send the batch
13661416
as-is to the server even if it contains rows for multiple tables.
13671417
In such case the server may end up committing some rows and not others.
@@ -1390,8 +1440,8 @@ cdef class Sender:
13901440
cdef line_sender_opts* _opts
13911441
cdef line_sender* _impl
13921442
cdef Buffer _buffer
1393-
cdef bint _auto_flush_enabled
1394-
cdef ssize_t _auto_flush_watermark
1443+
cdef auto_flush_mode_t _auto_flush_mode
1444+
cdef size_t _auto_flush_watermark
13951445
cdef size_t _init_capacity
13961446
cdef size_t _max_name_len
13971447

@@ -1407,7 +1457,7 @@ cdef class Sender:
14071457
uint64_t read_timeout=15000,
14081458
uint64_t init_capacity=65536, # 64KiB
14091459
uint64_t max_name_len=127,
1410-
object auto_flush=64512, # 63KiB
1460+
object auto_flush=None, # AutoFlush.RowCount(600)
14111461
bint transactional=False,
14121462
uint32_t max_retries=3,
14131463
uint64_t retry_interval=100, # milliseconds
@@ -1443,6 +1493,8 @@ cdef class Sender:
14431493

14441494
cdef qdb_pystr_buf* b
14451495

1496+
cdef ssize_t auto_flush_watermark = 0
1497+
14461498
self._opts = NULL
14471499
self._impl = NULL
14481500

@@ -1532,13 +1584,30 @@ cdef class Sender:
15321584
line_sender_opts_retry_interval(self._opts, retry_interval)
15331585
line_sender_opts_min_throughput(self._opts, min_throughput)
15341586

1535-
self._auto_flush_enabled = not not auto_flush
1536-
self._auto_flush_watermark = int(auto_flush) \
1537-
if self._auto_flush_enabled else 0
1538-
if self._auto_flush_watermark < 0:
1587+
1588+
# Parse `auto_flush` argument.
1589+
if auto_flush is None:
1590+
self._auto_flush_mode = auto_flush_row_count
1591+
auto_flush_watermark = 600
1592+
elif auto_flush is AutoFlush.Disabled:
1593+
self._auto_flush_mode = auto_flush_disabled
1594+
auto_flush_watermark = 0
1595+
elif isinstance(auto_flush, AutoFlush.RowCount):
1596+
self._auto_flush_mode = auto_flush_row_count
1597+
auto_flush_watermark = auto_flush.value
1598+
elif isinstance(auto_flush, AutoFlush.ByteCount):
1599+
self._auto_flush_mode = auto_flush_byte_count
1600+
auto_flush_watermark = auto_flush.value
1601+
else:
1602+
raise TypeError(
1603+
'auto_flush must be AutoFlush.Disabled, '
1604+
'AutoFlush.RowCount or AutoFlush.ByteCount, '
1605+
f'not {_fqn(type(auto_flush))}')
1606+
if auto_flush_watermark < 0:
15391607
raise ValueError(
1540-
'auto_flush_watermark must be >= 0, '
1541-
f'not {self._auto_flush_watermark}')
1608+
'auto_flush watermark must be >= 0, '
1609+
f'not {auto_flush_watermark}')
1610+
self._auto_flush_watermark = auto_flush_watermark
15421611

15431612
qdb_pystr_buf_clear(b)
15441613

@@ -1678,8 +1747,9 @@ cdef class Sender:
16781747
may have been transmitted to the server already.
16791748
"""
16801749
cdef auto_flush_t af = auto_flush_blank()
1681-
if self._auto_flush_enabled:
1750+
if self._auto_flush_mode:
16821751
af.sender = self._impl
1752+
af.mode = self._auto_flush_mode
16831753
af.watermark = self._auto_flush_watermark
16841754
_dataframe(
16851755
af,

test/test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ def test_independent_buffer(self):
359359

360360
def test_auto_flush(self):
361361
with Server() as server:
362-
with qi.Sender('localhost', server.port, auto_flush=4) as sender:
362+
with qi.Sender('localhost', server.port, auto_flush=qi.AutoFlush.ByteCount(4)) as sender:
363363
server.accept()
364364
sender.row('tbl1', symbols={'sym1': 'val1'})
365365
self.assertEqual(len(sender), 0) # auto-flushed buffer.
@@ -368,7 +368,7 @@ def test_auto_flush(self):
368368

369369
def test_immediate_auto_flush(self):
370370
with Server() as server:
371-
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
371+
with qi.Sender('localhost', server.port, auto_flush=qi.AutoFlush.RowCount(1)) as sender:
372372
server.accept()
373373
sender.row('tbl1', symbols={'sym1': 'val1'})
374374
self.assertEqual(len(sender), 0) # auto-flushed buffer.
@@ -377,7 +377,7 @@ def test_immediate_auto_flush(self):
377377

378378
def test_auto_flush_on_closed_socket(self):
379379
with Server() as server:
380-
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
380+
with qi.Sender('localhost', server.port, auto_flush=qi.AutoFlush.RowCount(1)) as sender:
381381
server.accept()
382382
server.close()
383383
exp_err = 'Could not flush buffer.* - See https'
@@ -389,7 +389,7 @@ def test_auto_flush_on_closed_socket(self):
389389
def test_dont_auto_flush(self):
390390
msg_counter = 0
391391
with Server() as server:
392-
with qi.Sender('localhost', server.port, auto_flush=0) as sender:
392+
with qi.Sender('localhost', server.port, auto_flush=qi.AutoFlush.Disabled) as sender:
393393
server.accept()
394394
while len(sender) < 32768: # 32KiB
395395
sender.row('tbl1', symbols={'sym1': 'val1'})
@@ -433,7 +433,7 @@ def test_dataframe_auto_flush(self):
433433
with Server() as server:
434434
# An auto-flush size of 20 bytes is enough to auto-flush the first
435435
# row, but not the second.
436-
with qi.Sender('localhost', server.port, auto_flush=20) as sender:
436+
with qi.Sender('localhost', server.port, auto_flush=qi.AutoFlush.ByteCount(20)) as sender:
437437
server.accept()
438438
df = pd.DataFrame({'a': [100000, 2], 'b': [3.0, 4.0]})
439439
sender.dataframe(df, table_name='tbl1')

0 commit comments

Comments
 (0)