File tree Expand file tree Collapse file tree 1 file changed +10
-1
lines changed Expand file tree Collapse file tree 1 file changed +10
-1
lines changed Original file line number Diff line number Diff line change 2
2
import io
3
3
import re
4
4
import threading
5
+ import random
5
6
import time
6
7
import zlib
7
8
from functools import wraps , partial
@@ -303,6 +304,14 @@ def __init__(
303
304
self ._flush_event = Event ()
304
305
self ._force_flush = False
305
306
307
+ # The aggregator shifts it's flushing by up to an entire rollup window to
308
+ # avoid multiple clients trampling on end of a 10 second window as all the
309
+ # buckets are anchored to multiples of ROLLUP seconds. We randomize this
310
+ # number once per aggregator boot to achieve some level of offsetting
311
+ # across a fleet of deployed SDKs. Relay itself will also apply independent
312
+ # jittering.
313
+ self ._flush_shift = random .random () * self .ROLLUP_IN_SECONDS
314
+
306
315
self ._flusher = None # type: Optional[Thread]
307
316
self ._flusher_pid = None # type: Optional[int]
308
317
self ._ensure_thread ()
@@ -339,7 +348,7 @@ def _flushable_buckets(self):
339
348
# type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
340
349
with self ._lock :
341
350
force_flush = self ._force_flush
342
- cutoff = time .time () - self .ROLLUP_IN_SECONDS
351
+ cutoff = time .time () - self .ROLLUP_IN_SECONDS - self . _flush_shift
343
352
flushable_buckets = () # type: Iterable[Tuple[int, Dict[BucketKey, Metric]]]
344
353
weight_to_remove = 0
345
354
You can’t perform that action at this time.
0 commit comments