Skip to content

Commit 1ed8d6c

Browse files
committed
add line_sender_c_example_array_elem_strides api
1 parent 5da905e commit 1ed8d6c

10 files changed

+509
-53
lines changed

CMakeLists.txt

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,13 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
100100
examples/concat.c
101101
examples/line_sender_c_example.c)
102102
compile_example(
103-
line_sender_c_example_array
103+
line_sender_c_example_array_byte_strides
104104
examples/concat.c
105-
examples/line_sender_c_example_array.c)
105+
examples/line_sender_c_example_array_byte_strides.c)
106+
compile_example(
107+
line_sender_c_example_array_elem_strides
108+
examples/concat.c
109+
examples/line_sender_c_example_array_elem_strides.c)
106110
compile_example(
107111
line_sender_c_example_auth
108112
examples/concat.c
@@ -129,8 +133,11 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
129133
line_sender_cpp_example
130134
examples/line_sender_cpp_example.cpp)
131135
compile_example(
132-
line_sender_cpp_example_array
133-
examples/line_sender_cpp_example_array.cpp)
136+
line_sender_cpp_example_array_byte_strides
137+
examples/line_sender_cpp_example_array_byte_strides.cpp)
138+
compile_example(
139+
line_sender_cpp_example_array_elem_strides
140+
examples/line_sender_cpp_example_array_elem_strides.cpp)
134141
compile_example(
135142
line_sender_cpp_example_auth
136143
examples/line_sender_cpp_example_auth.cpp)

cpp_test/test_line_sender.cpp

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -178,23 +178,38 @@ TEST_CASE("line_sender c api basics")
178178
2.7,
179179
48121.5,
180180
4.3};
181-
CHECK(::line_sender_buffer_column_f64_arr(
182-
buffer,
183-
arr_name,
184-
rank,
185-
shape,
186-
strides,
187-
reinterpret_cast<uint8_t*>(arr_data.data()),
188-
sizeof(arr_data),
189-
&err));
181+
CHECK(
182+
::line_sender_buffer_column_f64_arr_byte_strides(
183+
buffer,
184+
arr_name,
185+
rank,
186+
shape,
187+
strides,
188+
reinterpret_cast<uint8_t*>(arr_data.data()),
189+
sizeof(arr_data),
190+
&err));
191+
192+
line_sender_column_name arr_name2 = QDB_COLUMN_NAME_LITERAL("a2");
193+
intptr_t elem_strides[] = {6, 2, 1};
194+
CHECK(
195+
::line_sender_buffer_column_f64_arr_elem_strides(
196+
buffer,
197+
arr_name2,
198+
rank,
199+
shape,
200+
elem_strides,
201+
reinterpret_cast<uint8_t*>(arr_data.data()),
202+
sizeof(arr_data),
203+
&err));
190204
CHECK(::line_sender_buffer_at_nanos(buffer, 10000000, &err));
191205
CHECK(server.recv() == 0);
192-
CHECK(::line_sender_buffer_size(buffer) == 150);
206+
CHECK(::line_sender_buffer_size(buffer) == 266);
193207
CHECK(::line_sender_flush(sender, buffer, &err));
194208
::line_sender_buffer_free(buffer);
195209
CHECK(server.recv() == 1);
196210
std::string expect{"test,t1=v1 f1=="};
197211
push_double_to_buffer(expect, 0.5).append(",a1==");
212+
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(",a2==");
198213
push_double_arr_to_buffer(expect, arr_data, 3, shape).append(" 10000000\n");
199214
CHECK(server.msgs(0) == expect);
200215
}
@@ -264,19 +279,23 @@ TEST_CASE("line_sender c++ api basics")
264279
2.7,
265280
48121.5,
266281
4.3};
282+
std::vector<intptr_t> elem_strides{6, 2, 1};
267283
buffer.table("test")
268284
.symbol("t1", "v1")
269285
.symbol("t2", "")
270286
.column("f1", 0.5)
271-
.column("a1", rank, shape, strides, arr_data)
287+
.column<true>("a1", rank, shape, strides, arr_data)
288+
.column<false>("a2", rank, shape, elem_strides, arr_data)
272289
.at(questdb::ingress::timestamp_nanos{10000000});
273290

274291
CHECK(server.recv() == 0);
275-
CHECK(buffer.size() == 154);
292+
CHECK(buffer.size() == 270);
276293
sender.flush(buffer);
277294
CHECK(server.recv() == 1);
278295
std::string expect{"test,t1=v1,t2= f1=="};
279296
push_double_to_buffer(expect, 0.5).append(",a1==");
297+
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
298+
.append(",a2==");
280299
push_double_arr_to_buffer(expect, arr_data, 3, shape.data())
281300
.append(" 10000000\n");
282301
CHECK(server.msgs(0) == expect);

examples/line_sender_c_example_array.c renamed to examples/line_sender_c_example_array_byte_strides.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ static bool example(const char* host, const char* port)
3131
buffer = line_sender_buffer_new_for_sender(sender);
3232
line_sender_buffer_reserve(buffer, 64 * 1024);
3333

34-
line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("market_orders");
34+
line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("market_orders_byte_strides");
3535
line_sender_column_name symbol_col = QDB_COLUMN_NAME_LITERAL("symbol");
3636
line_sender_column_name book_col = QDB_COLUMN_NAME_LITERAL("order_book");
3737

@@ -60,7 +60,7 @@ static bool example(const char* host, const char* port)
6060
48121.5,
6161
4.3};
6262

63-
if (!line_sender_buffer_column_f64_arr(
63+
if (!line_sender_buffer_column_f64_arr_byte_strides(
6464
buffer,
6565
book_col,
6666
array_rank,
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#include <questdb/ingress/line_sender.h>
2+
#include <stdio.h>
3+
#include <stdbool.h>
4+
#include <string.h>
5+
#include "concat.h"
6+
7+
static bool example(const char* host, const char* port)
8+
{
9+
line_sender_error* err = NULL;
10+
line_sender* sender = NULL;
11+
line_sender_buffer* buffer = NULL;
12+
char* conf_str = concat("tcp::addr=", host, ":", port, ";protocol_version=2;");
13+
if (!conf_str)
14+
{
15+
fprintf(stderr, "Could not concatenate configuration string.\n");
16+
return false;
17+
}
18+
19+
line_sender_utf8 conf_str_utf8 = {0, NULL};
20+
if (!line_sender_utf8_init(
21+
&conf_str_utf8, strlen(conf_str), conf_str, &err))
22+
goto on_error;
23+
24+
sender = line_sender_from_conf(conf_str_utf8, &err);
25+
if (!sender)
26+
goto on_error;
27+
28+
free(conf_str);
29+
conf_str = NULL;
30+
31+
buffer = line_sender_buffer_new_for_sender(sender);
32+
line_sender_buffer_reserve(buffer, 64 * 1024);
33+
34+
line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("market_orders_elem_strides");
35+
line_sender_column_name symbol_col = QDB_COLUMN_NAME_LITERAL("symbol");
36+
line_sender_column_name book_col = QDB_COLUMN_NAME_LITERAL("order_book");
37+
38+
if (!line_sender_buffer_table(buffer, table_name, &err))
39+
goto on_error;
40+
41+
line_sender_utf8 symbol_val = QDB_UTF8_LITERAL("BTC-USD");
42+
if (!line_sender_buffer_symbol(buffer, symbol_col, symbol_val, &err))
43+
goto on_error;
44+
45+
size_t array_rank = 3;
46+
uintptr_t array_shape[] = {2, 3, 2};
47+
intptr_t array_strides[] = {6, 2, 1};
48+
49+
double array_data[] = {
50+
48123.5,
51+
2.4,
52+
48124.0,
53+
1.8,
54+
48124.5,
55+
0.9,
56+
48122.5,
57+
3.1,
58+
48122.0,
59+
2.7,
60+
48121.5,
61+
4.3};
62+
63+
if (!line_sender_buffer_column_f64_arr_elem_strides(
64+
buffer,
65+
book_col,
66+
array_rank,
67+
array_shape,
68+
array_strides,
69+
(const uint8_t*)array_data,
70+
sizeof(array_data),
71+
&err))
72+
goto on_error;
73+
74+
if (!line_sender_buffer_at_nanos(buffer, line_sender_now_nanos(), &err))
75+
goto on_error;
76+
77+
if (!line_sender_flush(sender, buffer, &err))
78+
goto on_error;
79+
80+
line_sender_close(sender);
81+
return true;
82+
83+
on_error:;
84+
size_t err_len = 0;
85+
const char* err_msg = line_sender_error_msg(err, &err_len);
86+
fprintf(stderr, "Error: %.*s\n", (int)err_len, err_msg);
87+
free(conf_str);
88+
line_sender_error_free(err);
89+
line_sender_buffer_free(buffer);
90+
line_sender_close(sender);
91+
return false;
92+
}
93+
94+
int main(int argc, const char* argv[])
95+
{
96+
const char* host = (argc >= 2) ? argv[1] : "localhost";
97+
const char* port = (argc >= 3) ? argv[2] : "9009";
98+
return !example(host, port);
99+
}

examples/line_sender_cpp_example_array.cpp renamed to examples/line_sender_cpp_example_array_byte_strides.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ static bool array_example(std::string_view host, std::string_view port)
1313
"tcp::addr=" + std::string{host} + ":" + std::string{port} +
1414
";protocol_version=2;");
1515

16-
const auto table_name = "cpp_market_orders"_tn;
16+
const auto table_name = "cpp_market_orders_byte_strides"_tn;
1717
const auto symbol_col = "symbol"_cn;
1818
const auto book_col = "order_book"_cn;
1919
size_t rank = 3;
@@ -36,7 +36,7 @@ static bool array_example(std::string_view host, std::string_view port)
3636
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
3737
buffer.table(table_name)
3838
.symbol(symbol_col, "BTC-USD"_utf8)
39-
.column(book_col, 3, shape, strides, arr_data)
39+
.column<true>(book_col, 3, shape, strides, arr_data)
4040
.at(questdb::ingress::timestamp_nanos::now());
4141
sender.flush(buffer);
4242
return true;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#include <questdb/ingress/line_sender.hpp>
2+
#include <iostream>
3+
#include <vector>
4+
5+
using namespace std::literals::string_view_literals;
6+
using namespace questdb::ingress::literals;
7+
8+
static bool array_example(std::string_view host, std::string_view port)
9+
{
10+
try
11+
{
12+
auto sender = questdb::ingress::line_sender::from_conf(
13+
"tcp::addr=" + std::string{host} + ":" + std::string{port} +
14+
";protocol_version=2;");
15+
16+
const auto table_name = "cpp_market_orders_elem_strides"_tn;
17+
const auto symbol_col = "symbol"_cn;
18+
const auto book_col = "order_book"_cn;
19+
size_t rank = 3;
20+
std::vector<uintptr_t> shape{2, 3, 2};
21+
std::vector<intptr_t> strides{6, 2, 8};
22+
std::array<double, 12> arr_data = {
23+
48123.5,
24+
2.4,
25+
48124.0,
26+
1.8,
27+
48124.5,
28+
0.9,
29+
48122.5,
30+
3.1,
31+
48122.0,
32+
2.7,
33+
48121.5,
34+
4.3};
35+
36+
questdb::ingress::line_sender_buffer buffer = sender.new_buffer();
37+
buffer.table(table_name)
38+
.symbol(symbol_col, "BTC-USD"_utf8)
39+
.column<false>(book_col, 3, shape, strides, arr_data)
40+
.at(questdb::ingress::timestamp_nanos::now());
41+
sender.flush(buffer);
42+
return true;
43+
}
44+
catch (const questdb::ingress::line_sender_error& err)
45+
{
46+
std::cerr << "[ERROR] " << err.what() << std::endl;
47+
return false;
48+
}
49+
}
50+
51+
int main(int argc, const char* argv[])
52+
{
53+
auto host = "localhost"sv;
54+
if (argc >= 2)
55+
host = std::string_view{argv[1]};
56+
57+
auto port = "9009"sv;
58+
if (argc >= 3)
59+
port = std::string_view{argv[2]};
60+
61+
return !array_example(host, port);
62+
}

include/questdb/ingress/line_sender.h

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,37 @@ bool line_sender_buffer_column_str(
499499

500500
/**
501501
* Record a multidimensional array of double for the given column.
502-
* The array data must be stored in row-major order (C-style contiguous layout).
502+
*
503+
* This API uses BYTE-LEVEL STRIDES where the stride values represent the
504+
* number of bytes between consecutive elements along each dimension.
505+
*
506+
* @param[in] buffer Line buffer object.
507+
* @param[in] name Column name.
508+
* @param[in] rank Number of dimensions of the array.
509+
* @param[in] shape Array of dimension sizes (length = `rank`).
510+
* Each element must be a positive integer.
511+
* @param[in] strides Array strides.
512+
* @param[in] data_buffer First array element data.
513+
* @param[in] data_buffer_len Bytes length of the array data.
514+
* @param[out] err_out Set to an error object on failure (if non-NULL).
515+
* @return true on success, false on error.
516+
*/
517+
LINESENDER_API
518+
bool line_sender_buffer_column_f64_arr_byte_strides(
519+
line_sender_buffer* buffer,
520+
line_sender_column_name name,
521+
size_t rank,
522+
const uintptr_t* shape,
523+
const intptr_t* strides,
524+
const uint8_t* data_buffer,
525+
size_t data_buffer_len,
526+
line_sender_error** err_out);
527+
528+
/**
529+
* Record a multidimensional array of double for the given column.
530+
*
531+
* This function uses ELEMENT-LEVEL STRIDES where the stride values represent
532+
* the number of elements between consecutive elements along each dimension.
503533
*
504534
* @param[in] buffer Line buffer object.
505535
* @param[in] name Column name.
@@ -513,7 +543,7 @@ bool line_sender_buffer_column_str(
513543
* @return true on success, false on error.
514544
*/
515545
LINESENDER_API
516-
bool line_sender_buffer_column_f64_arr(
546+
bool line_sender_buffer_column_f64_arr_elem_strides(
517547
line_sender_buffer* buffer,
518548
line_sender_column_name name,
519549
size_t rank,

include/questdb/ingress/line_sender.hpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,12 +643,18 @@ class line_sender_buffer
643643
/**
644644
* Record a multidimensional double-precision array for the given column.
645645
*
646+
* @tparam B Strides mode selector:
647+
* - `true` for byte-level strides
648+
* - `false` for element-level strides
649+
* @tparam T Element type (current only `double` is supported).
650+
* @tparam N Number of elements in the flat data array
651+
*
646652
* @param name Column name.
647653
* @param shape Array dimensions (e.g., [2,3] for a 2x3 matrix).
648654
* @param data Array first element data. Size must match product of
649655
* dimensions.
650656
*/
651-
template <typename T, size_t N>
657+
template <bool B, typename T, size_t N>
652658
line_sender_buffer& column(
653659
column_name_view name,
654660
const size_t rank,
@@ -661,7 +667,8 @@ class line_sender_buffer
661667
"Only double types are supported for arrays");
662668
may_init();
663669
line_sender_error::wrapped_call(
664-
::line_sender_buffer_column_f64_arr,
670+
B ? ::line_sender_buffer_column_f64_arr_byte_strides
671+
: ::line_sender_buffer_column_f64_arr_elem_strides,
665672
_impl,
666673
name._impl,
667674
rank,

0 commit comments

Comments
 (0)