Skip to content

Commit aac4413

Browse files
authored
feat: ILP over HTTP (InfluxDB compatible) (#50)
1 parent ea926d5 commit aac4413

28 files changed

+3255
-577
lines changed

CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
100100
compile_example(
101101
line_sender_c_example_auth_tls
102102
examples/line_sender_c_example_auth_tls.c)
103+
compile_example(
104+
line_sender_c_example_http
105+
examples/line_sender_c_example_http.c)
103106
compile_example(
104107
line_sender_cpp_example
105108
examples/line_sender_cpp_example.cpp)
@@ -112,6 +115,9 @@ if (QUESTDB_TESTS_AND_EXAMPLES)
112115
compile_example(
113116
line_sender_cpp_example_auth_tls
114117
examples/line_sender_cpp_example_auth_tls.cpp)
118+
compile_example(
119+
line_sender_cpp_example_http
120+
examples/line_sender_cpp_example_http.cpp)
115121

116122
# Include Rust tests as part of the tests run
117123
add_test(

ci/compile.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
steps:
2+
- script: |
3+
rustup update $(toolchain)
4+
rustup default $(toolchain)
5+
condition: ne(variables['toolchain'], '')
6+
displayName: "Update and set Rust toolchain"
7+
- script: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release -DQUESTDB_TESTS_AND_EXAMPLES=ON
8+
env:
9+
JAVA_HOME: $(JAVA_HOME_11_X64)
10+
displayName: "Build Makefile with CMake"
11+
- script: cmake --build build
12+
env:
13+
JAVA_HOME: $(JAVA_HOME_11_X64)
14+
displayName: "Make"

ci/run_all_tests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,9 @@ def main():
3838
test_line_sender_path = next(iter(
3939
build_dir.glob(f'**/test_line_sender{exe_suffix}')))
4040
system_test_path = pathlib.Path('system_test') / 'test.py'
41-
qdb_v = '7.3.2' # The version of QuestDB we'll test against.
41+
qdb_v = '7.3.7' # The version of QuestDB we'll test against.
4242

43+
run_cmd('cargo', 'test', '--', '--nocapture', cwd='questdb-rs')
4344
run_cmd('cargo', 'test', '--all-features', '--', '--nocapture', cwd='questdb-rs')
4445
run_cmd(str(test_line_sender_path))
4546
run_cmd('python3', str(system_test_path), 'run', '--versions', qdb_v, '-v')

ci/run_tests_pipeline.yaml

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,27 @@ stages:
1919
linux:
2020
imageName: "ubuntu-latest"
2121
poolName: "Azure Pipelines"
22-
BUILD_WITH_MINGW: "0"
2322
linux-stable:
2423
imageName: "ubuntu-latest"
2524
poolName: "Azure Pipelines"
26-
BUILD_WITH_MINGW: "0"
2725
toolchain: "stable"
2826
linux-beta:
2927
imageName: "ubuntu-latest"
3028
poolName: "Azure Pipelines"
31-
BUILD_WITH_MINGW: "0"
3229
toolchain: "beta"
3330
linux-nightly:
3431
imageName: "ubuntu-latest"
3532
poolName: "Azure Pipelines"
36-
BUILD_WITH_MINGW: "0"
3733
toolchain: "nightly"
3834
mac:
3935
imageName: "macos-latest"
4036
poolName: "Azure Pipelines"
41-
BUILD_WITH_MINGW: "0"
4237
windows-msvc-2022:
4338
imageName: "windows-2022"
4439
poolName: "Azure Pipelines"
45-
BUILD_WITH_MINGW: "0"
4640
windows-msvc-2019:
4741
imageName: "windows-2019"
4842
poolName: "Azure Pipelines"
49-
BUILD_WITH_MINGW: "0"
50-
# windows-mingw:
51-
# imageName: "windows-2022"
52-
# poolName: "Azure Pipelines"
53-
# BUILD_WITH_MINGW: "1"
5443
pool:
5544
name: $(poolName)
5645
vmImage: $(imageName)
@@ -60,34 +49,11 @@ stages:
6049
fetchDepth: 1
6150
lfs: false
6251
submodules: false
63-
- script: |
64-
rustup update $(toolchain)
65-
rustup default $(toolchain)
66-
condition: ne(variables['toolchain'], '')
67-
displayName: "Update and set Rust toolchain"
52+
- template: compile.yaml
6853
- script: |
6954
cd questdb-rs
70-
cargo build --example basic --features=chrono_timestamp
71-
cargo build --example auth --features=chrono_timestamp
72-
cargo build --example auth_tls --features=chrono_timestamp
73-
displayName: Build Rust examples.
74-
- script: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release -DQUESTDB_TESTS_AND_EXAMPLES=ON
75-
env:
76-
JAVA_HOME: $(JAVA_HOME_11_X64)
77-
displayName: "Build Makefile with CMake"
78-
condition: eq(variables['BUILD_WITH_MINGW'], '0')
79-
# - script: |
80-
# choco upgrade mingw -y
81-
# rustup target add x86_64-pc-windows-gnu
82-
# cmake -S . -B build -DCMAKE_BUILD_TYPE=Release -G "MinGW Makefiles"
83-
# env:
84-
# JAVA_HOME: $(JAVA_HOME_11_X64)
85-
# displayName: "Build Makefile with CMake"
86-
# condition: eq(variables['BUILD_WITH_MINGW'], '1')
87-
- script: cmake --build build
88-
env:
89-
JAVA_HOME: $(JAVA_HOME_11_X64)
90-
displayName: "Make"
55+
cargo build --examples --all-features
56+
displayName: "Build Rust examples"
9157
- script: python3 ci/run_all_tests.py
9258
env:
9359
JAVA_HOME: $(JAVA_HOME_11_X64)
@@ -96,7 +62,6 @@ stages:
9662
inputs:
9763
pathToPublish: ./build
9864
displayName: "Publish build directory"
99-
condition: eq(variables['BUILD_WITH_MINGW'], '1')
10065
- job: CargoFmtAndClippy
10166
displayName: "cargo fmt and clippy"
10267
pool:
@@ -134,3 +99,26 @@ stages:
13499
cd tls_proxy
135100
cargo clippy --all-targets --all-features -- -D warnings
136101
displayName: "tls_proxy: clippy"
102+
- job: TestVsQuestDBMaster
103+
displayName: "Vs QuestDB 'master'"
104+
pool:
105+
vmImage: 'ubuntu-latest'
106+
timeoutInMinutes: 60
107+
steps:
108+
- checkout: self
109+
fetchDepth: 1
110+
lfs: false
111+
submodules: false
112+
- template: compile.yaml
113+
- script: |
114+
git clone --depth 1 https://github.com/questdb/questdb.git
115+
displayName: git clone questdb
116+
- task: Maven@3
117+
displayName: "Compile QuestDB"
118+
inputs:
119+
mavenPOMFile: 'questdb/pom.xml'
120+
jdkVersionOption: '1.11'
121+
options: "install -DskipTests -Pbuild-web-console"
122+
- script: |
123+
python3 system_test/test.py run --repo ./questdb -v
124+
displayName: "integration test"

cpp_test/test_line_sender.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,4 +715,19 @@ TEST_CASE("Empty Buffer") {
715715
sender.flush_and_keep(b1),
716716
"State error: Bad call to `flush`, should have called `table` instead.",
717717
questdb::ingress::line_sender_error);
718+
}
719+
720+
TEST_CASE("HTTP basics") {
721+
questdb::ingress::opts opts1{"localhost", 1};
722+
questdb::ingress::opts opts2{"localhost", 1};
723+
opts1.http().transactional().max_retries(5).retry_interval(10).basic_auth("user", "pass");
724+
opts2.http().token_auth("token").min_throughput(1000);
725+
questdb::ingress::line_sender sender1{opts1};
726+
questdb::ingress::line_sender sender2{opts2};
727+
728+
questdb::ingress::line_sender_buffer b1;
729+
b1.table("test").symbol("a", "b").at_now();
730+
731+
CHECK_THROWS_AS(sender1.flush(b1), questdb::ingress::line_sender_error);
732+
CHECK_THROWS_AS(sender2.flush(b1), questdb::ingress::line_sender_error);
718733
}

examples/line_sender_c_example_http.c

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#include <questdb/ingress/line_sender.h>
2+
#include <stdio.h>
3+
#include <stdbool.h>
4+
#include <string.h>
5+
6+
static bool example(const char* host, const char* port)
7+
{
8+
line_sender_error* err = NULL;
9+
line_sender_opts* opts = NULL;
10+
line_sender* sender = NULL;
11+
line_sender_buffer* buffer = NULL;
12+
13+
line_sender_utf8 host_utf8 = { 0, NULL };
14+
if (!line_sender_utf8_init(&host_utf8, strlen(host), host, &err))
15+
goto on_error;
16+
17+
line_sender_utf8 port_utf8 = { 0, NULL };
18+
if (!line_sender_utf8_init(&port_utf8, strlen(port), port, &err))
19+
goto on_error;
20+
21+
// Call `line_sender_opts_new` if instead you have an integer port.
22+
opts = line_sender_opts_new_service(host_utf8, port_utf8);
23+
24+
// Use ILP/HTTP instead of ILP/TCP for better error messages.
25+
line_sender_opts_http(opts);
26+
27+
// Ensure that each buffer contains lines for a single table on each flush.
28+
line_sender_opts_transactional(opts);
29+
30+
sender = line_sender_connect(opts, &err);
31+
line_sender_opts_free(opts);
32+
opts = NULL;
33+
if (!sender)
34+
goto on_error;
35+
36+
buffer = line_sender_buffer_new();
37+
line_sender_buffer_reserve(buffer, 64 * 1024); // 64KB buffer initial size.
38+
39+
// We prepare all our table names and column names in advance.
40+
// If we're inserting multiple rows, this allows us to avoid
41+
// re-validating the same strings over and over again.
42+
line_sender_table_name table_name = QDB_TABLE_NAME_LITERAL("c_cars_http");
43+
line_sender_column_name id_name = QDB_COLUMN_NAME_LITERAL("id");
44+
line_sender_column_name x_name = QDB_COLUMN_NAME_LITERAL("x");
45+
line_sender_column_name y_name = QDB_COLUMN_NAME_LITERAL("y");
46+
line_sender_column_name booked_name = QDB_COLUMN_NAME_LITERAL("booked");
47+
line_sender_column_name passengers_name = QDB_COLUMN_NAME_LITERAL(
48+
"passengers");
49+
line_sender_column_name driver_name = QDB_COLUMN_NAME_LITERAL("driver");
50+
51+
if (!line_sender_buffer_table(buffer, table_name, &err))
52+
goto on_error;
53+
54+
line_sender_utf8 id_value = QDB_UTF8_LITERAL(
55+
"d6e5fe92-d19f-482a-a97a-c105f547f721");
56+
if (!line_sender_buffer_symbol(buffer, id_name, id_value, &err))
57+
goto on_error;
58+
59+
if (!line_sender_buffer_column_f64(buffer, x_name, 30.5, &err))
60+
goto on_error;
61+
62+
if (!line_sender_buffer_column_f64(buffer, y_name, -150.25, &err))
63+
goto on_error;
64+
65+
if (!line_sender_buffer_column_bool(buffer, booked_name, true, &err))
66+
goto on_error;
67+
68+
if (!line_sender_buffer_column_i64(buffer, passengers_name, 3, &err))
69+
goto on_error;
70+
71+
line_sender_utf8 driver_value = QDB_UTF8_LITERAL("John Doe");
72+
if (!line_sender_buffer_column_str(buffer, driver_name, driver_value, &err))
73+
goto on_error;
74+
75+
// 1997-07-04 04:56:55 UTC
76+
int64_t designated_timestamp = 867992215000000000;
77+
if (!line_sender_buffer_at_nanos(buffer, designated_timestamp, &err))
78+
goto on_error;
79+
80+
//// If you want to get the current system timestamp as nanos, call:
81+
// if (!line_sender_buffer_at_nanos(buffer, line_sender_now_nanos(), &err))
82+
// goto on_error;
83+
84+
// To insert more records, call `line_sender_buffer_table(..)...` again.
85+
// It's recommended to keep a timer and/or maximum buffer size to flush
86+
// the buffer periodically with any accumulated records.
87+
if (!line_sender_flush(sender, buffer, &err))
88+
goto on_error;
89+
90+
line_sender_close(sender);
91+
92+
return true;
93+
94+
on_error: ;
95+
line_sender_opts_free(opts);
96+
size_t err_len = 0;
97+
const char* err_msg = line_sender_error_msg(err, &err_len);
98+
fprintf(stderr, "Error running example: %.*s\n", (int)err_len, err_msg);
99+
line_sender_error_free(err);
100+
line_sender_buffer_free(buffer);
101+
line_sender_close(sender);
102+
return false;
103+
}
104+
105+
static bool displayed_help(int argc, const char* argv[])
106+
{
107+
for (int index = 1; index < argc; ++index)
108+
{
109+
const char* arg = argv[index];
110+
if ((strncmp(arg, "-h", 2) == 0) || (strncmp(arg, "--help", 6) == 0))
111+
{
112+
fprintf(stderr, "Usage:\n");
113+
fprintf(stderr, "line_sender_c_example: [HOST [PORT]]\n");
114+
fprintf(stderr, " HOST: ILP host (defaults to \"localhost\").\n");
115+
fprintf(stderr, " PORT: ILP port (defaults to \"9009\").\n");
116+
return true;
117+
}
118+
}
119+
return false;
120+
}
121+
122+
int main(int argc, const char* argv[])
123+
{
124+
if (displayed_help(argc, argv))
125+
return 0;
126+
127+
const char* host = "localhost";
128+
if (argc >= 2)
129+
host = argv[1];
130+
const char* port = "9009";
131+
if (argc >= 3)
132+
port = argv[2];
133+
134+
return !example(host, port);
135+
}

0 commit comments

Comments
 (0)