Skip to content

Commit 3c326fd

Browse files
authored
feat: new http retry logic (#53)
1 parent 9ef329d commit 3c326fd

File tree

10 files changed

+159
-124
lines changed

10 files changed

+159
-124
lines changed

ci/run_all_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def main():
3838
test_line_sender_path = next(iter(
3939
build_dir.glob(f'**/test_line_sender{exe_suffix}')))
4040
system_test_path = pathlib.Path('system_test') / 'test.py'
41-
qdb_v = '7.3.7' # The version of QuestDB we'll test against.
41+
qdb_v = '7.3.9' # The version of QuestDB we'll test against.
4242

4343
run_cmd('cargo', 'test', '--', '--nocapture', cwd='questdb-rs')
4444
run_cmd('cargo', 'test', '--all-features', '--', '--nocapture', cwd='questdb-rs')

cpp_test/test_line_sender.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -720,8 +720,17 @@ TEST_CASE("Empty Buffer") {
720720
TEST_CASE("HTTP basics") {
721721
questdb::ingress::opts opts1{"localhost", 1};
722722
questdb::ingress::opts opts2{"localhost", 1};
723-
opts1.http().transactional().max_retries(5).retry_interval(10).basic_auth("user", "pass");
724-
opts2.http().token_auth("token").min_throughput(1000);
723+
opts1
724+
.http()
725+
.transactional()
726+
.grace_timeout(5000)
727+
.retry_timeout(5)
728+
.basic_auth("user", "pass");
729+
opts2
730+
.http()
731+
.token_auth("token")
732+
.min_throughput(1000)
733+
.retry_timeout(0);
725734
questdb::ingress::line_sender sender1{opts1};
726735
questdb::ingress::line_sender sender2{opts2};
727736

include/questdb/ingress/line_sender.h

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -600,25 +600,13 @@ LINESENDER_API
600600
void line_sender_opts_http(line_sender_opts* opts);
601601

602602
/**
603-
* Maxmimum number of HTTP request retries.
604-
* Defaults to 3.
603+
* Cumulative duration spent in retries.
604+
* Default is 10 seconds.
605605
*/
606606
LINESENDER_API
607-
void line_sender_opts_max_retries(
607+
void line_sender_opts_retry_timeout(
608608
line_sender_opts* opts,
609-
uint32_t max_retries);
610-
611-
/**
612-
* The initial retry interval (specified in milliseconds).
613-
* This the default is 100 milliseconds.
614-
* The retry interval is doubled after each failed attempt,
615-
* up to the maximum number of retries.
616-
* Also see `max_retries`.
617-
*/
618-
LINESENDER_API
619-
void line_sender_opts_retry_interval(
620-
line_sender_opts* opts,
621-
uint64_t retry_interval_millis);
609+
uint64_t millis);
622610

623611
/**
624612
* Minimum expected throughput in bytes per second for HTTP requests.
@@ -631,6 +619,15 @@ void line_sender_opts_min_throughput(
631619
line_sender_opts* opts,
632620
uint64_t bytes_per_sec);
633621

622+
/**
623+
* Grace request timeout before relying on the minimum throughput logic.
624+
* The default is 5 seconds.
625+
*/
626+
LINESENDER_API
627+
void line_sender_opts_grace_timeout(
628+
line_sender_opts* opts,
629+
uint64_t millis);
630+
634631
/**
635632
* Enable transactional flushes.
636633
* This is only relevant for HTTP.

include/questdb/ingress/line_sender.hpp

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -824,27 +824,12 @@ namespace questdb::ingress
824824
}
825825

826826
/**
827-
* Maxmimum number of HTTP request retries.
828-
* Defaults to 3.
827+
* Cumulative duration spent in retries.
828+
* Default is 10 seconds.
829829
*/
830-
opts& max_retries(uint32_t max_retries) noexcept
830+
opts& retry_timeout(uint64_t millis) noexcept
831831
{
832-
::line_sender_opts_max_retries(_impl, max_retries);
833-
return *this;
834-
}
835-
836-
/**
837-
* The initial retry interval (specified in milliseconds).
838-
* This the default is 100 milliseconds.
839-
* The retry interval is doubled after each failed attempt,
840-
* up to the maximum number of retries.
841-
* Also see `max_retries`.
842-
*/
843-
opts& retry_interval(uint64_t retry_interval_millis) noexcept
844-
{
845-
::line_sender_opts_retry_interval(
846-
_impl,
847-
retry_interval_millis);
832+
::line_sender_opts_retry_timeout(_impl, millis);
848833
return *this;
849834
}
850835

@@ -860,6 +845,16 @@ namespace questdb::ingress
860845
return *this;
861846
}
862847

848+
/**
849+
* Grace request timeout before relying on the minimum throughput logic.
850+
* The default is 5 seconds.
851+
*/
852+
opts& grace_timeout(uint64_t millis) noexcept
853+
{
854+
::line_sender_opts_grace_timeout(_impl, millis);
855+
return *this;
856+
}
857+
863858
/**
864859
* Enable transactional flushes.
865860
* This is only relevant for HTTP.

questdb-rs-ffi/Cargo.lock

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

questdb-rs-ffi/src/lib.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -514,28 +514,12 @@ pub unsafe extern "C" fn line_sender_opts_http(opts: *mut line_sender_opts) {
514514
upd_opts!(opts, http);
515515
}
516516

517-
/// Maxmimum number of HTTP request retries.
518-
/// Defaults to 3.
517+
/// Cumulative duration spent in retries.
518+
/// Default is 10 seconds.
519519
#[no_mangle]
520-
pub unsafe extern "C" fn line_sender_opts_max_retries(
521-
opts: *mut line_sender_opts,
522-
max_retries: u32,
523-
) {
524-
upd_opts!(opts, max_retries, max_retries);
525-
}
526-
527-
/// The initial retry interval (specified in milliseconds).
528-
/// This the default is 100 milliseconds.
529-
/// The retry interval is doubled after each failed attempt,
530-
/// up to the maximum number of retries.
531-
/// Also see `max_retries`.
532-
#[no_mangle]
533-
pub unsafe extern "C" fn line_sender_opts_retry_interval(
534-
opts: *mut line_sender_opts,
535-
retry_interval_millis: u64,
536-
) {
537-
let retry_interval = std::time::Duration::from_millis(retry_interval_millis);
538-
upd_opts!(opts, retry_interval, retry_interval);
520+
pub unsafe extern "C" fn line_sender_opts_retry_timeout(opts: *mut line_sender_opts, millis: u64) {
521+
let retry_timeout = std::time::Duration::from_millis(millis);
522+
upd_opts!(opts, retry_timeout, retry_timeout);
539523
}
540524

541525
/// Minimum expected throughput in bytes per second for HTTP requests.
@@ -550,6 +534,14 @@ pub unsafe extern "C" fn line_sender_opts_min_throughput(
550534
upd_opts!(opts, min_throughput, bytes_per_sec);
551535
}
552536

537+
/// Grace request timeout before relying on the minimum throughput logic.
538+
/// The default is 5 seconds.
539+
#[no_mangle]
540+
pub unsafe extern "C" fn line_sender_opts_grace_timeout(opts: *mut line_sender_opts, millis: u64) {
541+
let grace_timeout = std::time::Duration::from_millis(millis);
542+
upd_opts!(opts, grace_timeout, grace_timeout);
543+
}
544+
553545
/// Enable transactional flushes.
554546
/// This is only relevant for HTTP.
555547
/// This works by ensuring that the buffer contains lines for a single table.

questdb-rs/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ webpki-roots = { version = "0.26.0", optional = true }
3030
chrono = { version = "0.4.30", optional = true }
3131
ureq = { version = "2.9.4", optional = true }
3232
serde_json = { version = "1.0.108", optional = true }
33+
rand = { version = "0.8.5", optional = true }
3334

3435
[target.'cfg(windows)'.dependencies]
3536
winapi = { version = "0.3.9", features = ["ws2def"] }
@@ -48,7 +49,7 @@ chrono = "0.4.31"
4849
default = ["tls-webpki-certs"]
4950

5051
# Include support for ILP over HTTP.
51-
ilp-over-http = ["dep:ureq", "dep:serde_json"]
52+
ilp-over-http = ["dep:ureq", "dep:serde_json", "dep:rand"]
5253

5354
# Allow use OS-provided root TLS certificates
5455
tls-native-certs = ["dep:rustls-native-certs"]

questdb-rs/src/ingress/http.rs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::{error, Error};
22
use base64ct::Base64;
33
use base64ct::Encoding;
4+
use rand::Rng;
45
use std::fmt::Write;
6+
use std::thread::sleep;
57
use std::time::Duration;
68

79
#[derive(Debug, Clone)]
@@ -39,8 +41,8 @@ impl TokenAuthParams {
3941
pub(super) struct HttpConfig {
4042
pub(super) min_throughput: u64,
4143
pub(super) user_agent: Option<String>,
42-
pub(super) max_retries: u32,
43-
pub(super) retry_interval: Duration,
44+
pub(super) retry_timeout: Duration,
45+
pub(super) grace_timeout: Duration,
4446
pub(super) transactional: bool,
4547
}
4648

@@ -55,7 +57,7 @@ pub(super) struct HttpHandlerState {
5557
pub(super) auth: Option<String>,
5658

5759
/// Additional grace period added to the timeout as calculated via `min_throughput`.
58-
pub(super) timeout_grace_period: Duration,
60+
pub(super) grace_timeout: Duration,
5961

6062
/// HTTP params configured via the `SenderBuilder`.
6163
pub(super) config: HttpConfig,
@@ -175,33 +177,50 @@ pub(super) fn is_retriable_error(err: &ureq::Error) -> bool {
175177
}
176178

177179
#[allow(clippy::result_large_err)] // `ureq::Error` is large enough to cause this warning.
178-
pub(super) fn retry_http_send(
180+
fn retry_http_send(
179181
request: ureq::Request,
180182
buf: &[u8],
181-
max_retries: u32,
182-
mut retry_interval: Duration,
183+
retry_timeout: Duration,
184+
mut last_err: ureq::Error,
183185
) -> Result<ureq::Response, ureq::Error> {
184-
let mut counter = 0;
185-
186+
let mut rng = rand::thread_rng();
187+
let retry_end = std::time::Instant::now() + retry_timeout;
188+
let mut retry_interval_ms = 10;
186189
loop {
187-
let response_or_err = request.clone().send_bytes(buf);
188-
let last_err = match response_or_err {
190+
let jitter_ms = rng.gen_range(-5i32..5);
191+
let to_sleep_ms = retry_interval_ms + jitter_ms;
192+
let to_sleep = Duration::from_millis(to_sleep_ms as u64);
193+
if (std::time::Instant::now() + to_sleep) > retry_end {
194+
return Err(last_err);
195+
}
196+
sleep(to_sleep);
197+
last_err = match request.clone().send_bytes(buf) {
189198
Ok(res) => return Ok(res),
190199
Err(err) => {
191-
if is_retriable_error(&err) {
192-
err
193-
} else {
200+
if !is_retriable_error(&err) {
194201
return Err(err);
195202
}
203+
err
196204
}
197205
};
206+
retry_interval_ms = (retry_interval_ms * 2).min(1000);
207+
}
208+
}
198209

199-
counter += 1;
200-
if counter > max_retries {
201-
return Err(last_err);
202-
}
210+
#[allow(clippy::result_large_err)] // `ureq::Error` is large enough to cause this warning.
211+
pub(super) fn http_send_with_retries(
212+
request: ureq::Request,
213+
buf: &[u8],
214+
retry_timeout: Duration,
215+
) -> Result<ureq::Response, ureq::Error> {
216+
let last_err = match request.clone().send_bytes(buf) {
217+
Ok(res) => return Ok(res),
218+
Err(err) => err,
219+
};
203220

204-
std::thread::sleep(retry_interval);
205-
retry_interval *= 2;
221+
if retry_timeout.is_zero() || !is_retriable_error(&last_err) {
222+
return Err(last_err);
206223
}
224+
225+
retry_http_send(request, buf, retry_timeout, last_err)
207226
}

0 commit comments

Comments
 (0)