Skip to content

Commit 1b12d91

Browse files
committed
Update idempotent insert
1 parent f8cd537 commit 1b12d91

File tree

4 files changed

+136
-2
lines changed

4 files changed

+136
-2
lines changed

example/idempotent/idempotent.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from proton_driver import connect, Client
2+
from datetime import date
3+
from time import sleep
4+
5+
6+
# Create a test stream
7+
def create_test_stream(operator, table_name, table_columns):
8+
operator.execute(f'DROP STREAM IF EXISTS {table_name};')
9+
operator.execute(f'CREATE STREAM {table_name} ({table_columns})')
10+
11+
12+
# Use dbapi to implement idempotent insertion
13+
def use_dbapi():
14+
with connect('proton://localhost') as conn:
15+
with conn.cursor() as cur:
16+
create_test_stream(
17+
cur,
18+
'test_user',
19+
'id int32, name string, birthday date'
20+
)
21+
22+
# Set idempotent_id.
23+
cur.set_settings(dict(idempotent_id='batch1'))
24+
25+
# Insert data into test_user multiple times with the same idempotent_id. # noqa
26+
# The query result should contain only the first inserted data.
27+
data = [
28+
(123456, 'timeplus', date(2024, 10, 24)),
29+
(789012, 'stream ', date(2023, 10, 24)),
30+
(135790, 'proton ', date(2024, 10, 24)),
31+
(246801, 'database', date(2024, 10, 24)),
32+
]
33+
34+
# Execute multiple insert operations.
35+
for _ in range(10):
36+
cur.execute(
37+
'INSERT INTO test_user (id, name, birthday) VALUES',
38+
data
39+
)
40+
cur.fetchall()
41+
42+
# wait for 3 sec to make sure data available in historical store.
43+
sleep(3)
44+
45+
cur.execute('SELECT count() FROM table(test_user)')
46+
res = cur.fetchall()
47+
48+
# Data is inserted only once,so res == (4,).
49+
print(res)
50+
51+
52+
# Use Client to implement idempotent insertion
53+
def use_client():
54+
cli = Client('localhost', 8463)
55+
create_test_stream(cli, 'test_stream', '`i` int, `v` string')
56+
57+
setting = {
58+
'idempotent_id': 'batch1'
59+
}
60+
61+
data = [
62+
(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'),
63+
(5, 'e'), (6, 'f'), (7, 'g'), (8, 'h')
64+
]
65+
66+
# Execute multiple insert operations.
67+
for _ in range(10):
68+
cli.execute(
69+
'INSERT INTO test_stream (i, v) VALUES',
70+
data,
71+
settings=setting
72+
)
73+
74+
# wait for 3 sec to make sure data available in historical store.
75+
sleep(3)
76+
77+
res = cli.execute('SELECT count() FROM table(test_stream)')
78+
79+
# Data is inserted only once,so res == (8,).
80+
print(res)
81+
82+
83+
if __name__ == "__main__":
84+
use_dbapi() # (4,)
85+
use_client() # (8,)

proton_driver/settings/available.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,4 +402,7 @@
402402
'format_regexp_escaping_rule': SettingString,
403403
'format_regexp_skip_unmatched': SettingBool,
404404
'output_format_enable_streaming': SettingBool,
405+
406+
'idempotent_id': SettingString,
407+
'enable_idempotent_processing': SettingBool,
405408
}

tests/test_dbapi.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from contextlib import contextmanager
44
import socket
55
from unittest.mock import patch
6-
6+
from time import sleep
77
from proton_driver import connect
88
from proton_driver.dbapi import (
99
ProgrammingError, InterfaceError, OperationalError
@@ -159,6 +159,27 @@ def test_execute_insert(self):
159159
cursor.execute('INSERT INTO test VALUES', [[4]])
160160
self.assertEqual(cursor.rowcount, 1)
161161

162+
def test_idempotent_insert(self):
163+
with self.created_cursor() as cursor:
164+
cursor.execute('CREATE STREAM test (i int, v string)')
165+
data = [
166+
(123, 'abc'), (456, 'def'), (789, 'ghi'),
167+
(987, 'ihg'), (654, 'fed'), (321, 'cba'),
168+
]
169+
cursor.set_settings(dict(idempotent_id='batch1'))
170+
for _ in range(10):
171+
cursor.execute(
172+
'INSERT INTO test (i, v) VALUES',
173+
data
174+
)
175+
self.assertEqual(cursor.rowcount, 6)
176+
sleep(3)
177+
rv = cursor.execute('SELECT count(*) FROM table(test)')
178+
rv = cursor.fetchall()
179+
self.assertEqual(rv, [(6,)])
180+
181+
cursor.execute('DROP STREAM test')
182+
162183
def test_description(self):
163184
with self.created_cursor() as cursor:
164185
self.assertIsNone(cursor.description)

tests/test_insert.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import date
2-
2+
from time import sleep
33
from tests.testcase import BaseTestCase
44
from proton_driver import errors
55
from proton_driver.errors import ServerException
@@ -148,6 +148,31 @@ def test_insert_return(self):
148148
)
149149
self.assertEqual(rv, 5)
150150

151+
def test_idempotent_insert(self):
152+
self.client.execute('CREATE STREAM test (i int, v string)')
153+
154+
data = [
155+
(123, 'abc'), (456, 'def'), (789, 'ghi'),
156+
(987, 'ihg'), (654, 'fed'), (321, 'cba'),
157+
]
158+
159+
setting = {
160+
'idempotent_id': 'batch1'
161+
}
162+
163+
for _ in range(20):
164+
rv = self.client.execute(
165+
'INSERT INTO test (i, v) VALUES',
166+
data,
167+
settings=setting
168+
)
169+
self.assertEqual(rv, 6)
170+
sleep(3)
171+
rv = self.client.execute('SELECT count(*) FROM table(test)')
172+
self.assertEqual(rv, [(6, )])
173+
174+
self.client.execute('DROP STREAM test')
175+
151176

152177
class InsertColumnarTestCase(BaseTestCase):
153178
def test_insert_tuple_ok(self):

0 commit comments

Comments
 (0)