Skip to content

Commit 8a6b8da

Browse files
authored
feat(server): Send metrics via the global endpoint (#2902)
Introduces an option `http.global_metrics`. When enabled and not in processing mode, Relay sends metrics to the global batch endpoint at `/api/0/relays/metrics/` instead of envelopes. This endpoint allows for batched submission of metrics from multiple projects, which should reduce the overall number of requests. # Bug Fixes This change contains additional bug fixes that were discoverd during implementation: - The batch metrics endpoint no longer requires the `emit_outcomes` outcomes flag to be set. This was invalid copy & paste from the outcomes endpoint. - Requests with HTTP encoding received a signature computed from the compressed body. However, Relay requires the signature on the uncompressed body. # Details Building the request occurs in the `EnvelopeProcessor` in place of building envelopes in the following steps: 1. Iterate buckets from all projects and stream them into partitions. 2. While collecting, check the size of the partition. As soon as a partition reaches the batch size limit, flush the partition eagerly. Buckets at the border may be split. 3. At the end flush all remaining partitions with data. 4. To flush a partition, JSON-encode the list of buckets and optionally apply HTTP encoding (compression). 5. Submit the `SendMetricsRequest` with the payload and outcome metadata directly to the upstream. 6. In the response callback, log outcomes directly. Therefore, the request does not have to be awaited. In processing mode, Relay still produces to Kafka. The configuration option does not have an effect in processing mode. A note on stability: This endpoint and functionality is meant for operation at scale within a distributed Sentry installation. At this moment, it is not recommended to enable this option for external Relays.
1 parent 917ed49 commit 8a6b8da

File tree

13 files changed

+581
-173
lines changed

13 files changed

+581
-173
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
- Use a Lua script and in-memory cache for the cardinality limiting to reduce load on Redis. ([#2849](https://github.com/getsentry/relay/pull/2849))
1212
- Extract metrics for file spans. ([#2874](https://github.com/getsentry/relay/pull/2874))
1313
- Add an internal endpoint that allows Relays to submit metrics from multiple projects in a single request. ([#2869](https://github.com/getsentry/relay/pull/2869))
14+
- Introduce the configuration option `http.global_metrics`. When enabled, Relay submits metric buckets not through regular project-scoped Envelopes, but instead through the global endpoint. When this Relay serves a high number of projects, this can reduce the overall request volume. ([#2902](https://github.com/getsentry/relay/pull/2902))
1415
- Emit a `processor.message.duration` metric to assess the throughput of the internal CPU pool. ([#2877](https://github.com/getsentry/relay/pull/2877))
1516

1617
## 23.12.0

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-metrics/src/aggregator.rs

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,9 @@ impl BucketKey {
7171
///
7272
/// This is used for partition key computation and statsd logging.
7373
fn hash64(&self) -> u64 {
74-
BucketKeyRef {
75-
project_key: self.project_key,
76-
timestamp: self.timestamp,
77-
metric_name: &self.metric_name,
78-
tags: &self.tags,
79-
}
80-
.hash64()
74+
let mut hasher = FnvHasher::default();
75+
std::hash::Hash::hash(self, &mut hasher);
76+
hasher.finish()
8177
}
8278

8379
/// Estimates the number of bytes needed to encode the bucket key.
@@ -97,29 +93,6 @@ impl BucketKey {
9793
}
9894
}
9995

100-
/// Pendant to [`BucketKey`] for referenced data, not owned data.
101-
///
102-
/// This makes it possible to compute a hash for a [`Bucket`]
103-
/// without destructing the bucket into a [`BucketKey`].
104-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
105-
struct BucketKeyRef<'a> {
106-
project_key: ProjectKey,
107-
timestamp: UnixTimestamp,
108-
metric_name: &'a str,
109-
tags: &'a BTreeMap<String, String>,
110-
}
111-
112-
impl<'a> BucketKeyRef<'a> {
113-
/// Creates a 64-bit hash of the bucket key using FnvHasher.
114-
///
115-
/// This is used for partition key computation and statsd logging.
116-
fn hash64(&self) -> u64 {
117-
let mut hasher = FnvHasher::default();
118-
std::hash::Hash::hash(self, &mut hasher);
119-
hasher.finish()
120-
}
121-
}
122-
12396
/// Estimates the number of bytes needed to encode the tags.
12497
///
12598
/// Note that this does not necessarily match the exact memory footprint of the tags,
@@ -888,36 +861,6 @@ impl fmt::Debug for Aggregator {
888861
}
889862
}
890863

891-
/// Splits buckets into N logical partitions, determined by the bucket key.
892-
pub fn partition_buckets(
893-
project_key: ProjectKey,
894-
buckets: impl IntoIterator<Item = Bucket>,
895-
flush_partitions: Option<u64>,
896-
) -> BTreeMap<Option<u64>, Vec<Bucket>> {
897-
let flush_partitions = match flush_partitions {
898-
None => return BTreeMap::from([(None, buckets.into_iter().collect())]),
899-
Some(x) => x.max(1), // handle 0,
900-
};
901-
let mut partitions = BTreeMap::<_, Vec<Bucket>>::new();
902-
for bucket in buckets {
903-
let key = BucketKeyRef {
904-
project_key,
905-
timestamp: bucket.timestamp,
906-
metric_name: &bucket.name,
907-
tags: &bucket.tags,
908-
};
909-
910-
let partition_key = key.hash64() % flush_partitions;
911-
partitions
912-
.entry(Some(partition_key))
913-
.or_default()
914-
.push(bucket);
915-
916-
relay_statsd::metric!(histogram(MetricHistograms::PartitionKeys) = partition_key);
917-
}
918-
partitions
919-
}
920-
921864
#[cfg(test)]
922865
mod tests {
923866

relay-metrics/src/statsd.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,6 @@ pub enum MetricHistograms {
124124
/// time period (`false`) or after the initial delay has expired (`true`).
125125
BucketsDelay,
126126

127-
///
128-
/// Distribution of flush buckets over partition keys.
129-
///
130-
/// The distribution of buckets should be even.
131-
/// If it is not, this metric should expose it.
132-
PartitionKeys,
133-
134127
/// Distribution of invalid bucket timestamps observed, relative to the time of observation.
135128
///
136129
/// This is a temporary metric to better understand why we see so many invalid timestamp errors.
@@ -143,7 +136,6 @@ impl HistogramMetric for MetricHistograms {
143136
Self::BucketsFlushed => "metrics.buckets.flushed",
144137
Self::BucketsFlushedPerProject => "metrics.buckets.flushed_per_project",
145138
Self::BucketsDelay => "metrics.buckets.delay",
146-
Self::PartitionKeys => "metrics.buckets.partition_keys",
147139
Self::InvalidBucketTimestamp => "metrics.buckets.invalid_timestamp",
148140
}
149141
}

relay-metrics/src/view.rs

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ use crate::BucketValue;
1515
/// and buckets larger will be split up.
1616
const BUCKET_SPLIT_FACTOR: usize = 32;
1717

18+
/// The base size of a serialized bucket in bytes.
19+
///
20+
/// This is the size of a bucket's fixed fields in JSON format, excluding the value and tags.
21+
const BUCKET_SIZE: usize = 50;
22+
1823
/// The average size of values when serialized.
1924
const AVG_VALUE_SIZE: usize = 8;
2025

@@ -276,14 +281,10 @@ impl<'a> Iterator for BucketsViewBySizeIter<'a> {
276281
}
277282
SplitDecision::MoveToNextBatch => break,
278283
SplitDecision::Split(at) => {
279-
// Only certain buckets can be split, if the bucket can't be split,
280-
// move it to the next batch.
281-
if bucket.can_split() {
282-
self.current = Index {
283-
slice: self.current.slice,
284-
bucket: self.current.bucket + at,
285-
};
286-
}
284+
self.current = Index {
285+
slice: self.current.slice,
286+
bucket: self.current.bucket + at,
287+
};
287288
break;
288289
}
289290
}
@@ -332,6 +333,7 @@ impl<'a> Serialize for BucketsView<'a> {
332333
/// ```
333334
///
334335
/// A view can be split again into multiple smaller views.
336+
#[derive(Clone)]
335337
pub struct BucketView<'a> {
336338
/// The source bucket.
337339
inner: &'a Bucket,
@@ -427,6 +429,46 @@ impl<'a> BucketView<'a> {
427429
Some(self)
428430
}
429431

432+
/// Estimates the number of bytes needed to serialize the bucket without value.
433+
///
434+
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
435+
/// approximated through tags and a static overhead.
436+
fn estimated_base_size(&self) -> usize {
437+
BUCKET_SIZE + self.name().len() + aggregator::tags_cost(self.tags())
438+
}
439+
440+
/// Estimates the number of bytes needed to serialize the bucket.
441+
///
442+
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
443+
/// approximated through the number of contained values, assuming an average size of serialized
444+
/// values.
445+
pub fn estimated_size(&self) -> usize {
446+
self.estimated_base_size() + self.len() * AVG_VALUE_SIZE
447+
}
448+
449+
/// Calculates a split for this bucket if its estimated serialization size exceeds a threshold.
450+
///
451+
/// There are three possible return values:
452+
/// - `(Some, None)` if the bucket fits entirely into the size budget. There is no split.
453+
/// - `(None, Some)` if the size budget cannot even hold the bucket name and tags. There is no
454+
/// split, the entire bucket is moved.
455+
/// - `(Some, Some)` if the bucket fits partially. Remaining values are moved into a new bucket
456+
/// with all other information cloned.
457+
///
458+
/// This is an approximate function. The bucket is not actually serialized, but rather its
459+
/// footprint is estimated through the number of data points contained. See
460+
/// [`estimated_size`](Self::estimated_size) for more information.
461+
pub fn split(self, size: usize, max_size: Option<usize>) -> (Option<Self>, Option<Self>) {
462+
match split_at(&self, size, max_size.unwrap_or(0) / BUCKET_SPLIT_FACTOR) {
463+
SplitDecision::BucketFits(_) => (Some(self), None),
464+
SplitDecision::MoveToNextBatch => (None, Some(self)),
465+
SplitDecision::Split(at) => {
466+
let Range { start, end } = self.range.clone();
467+
(self.clone().select(start..at), self.select(at..end))
468+
}
469+
}
470+
}
471+
430472
/// Whether the bucket can be split into multiple.
431473
///
432474
/// Only set and distribution buckets can be split.
@@ -624,14 +666,18 @@ enum SplitDecision {
624666
/// `estimate_size` for more information.
625667
fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) -> SplitDecision {
626668
// If there's enough space for the entire bucket, do not perform a split.
627-
let bucket_size = estimate_size(bucket);
669+
let bucket_size = bucket.estimated_size();
628670
if max_size >= bucket_size {
629671
return SplitDecision::BucketFits(bucket_size);
630672
}
631673

674+
if !bucket.can_split() {
675+
return SplitDecision::MoveToNextBatch;
676+
}
677+
632678
// If the bucket key can't even fit into the remaining length, move the entire bucket into
633679
// the right-hand side.
634-
let own_size = estimate_base_size(bucket);
680+
let own_size = bucket.estimated_base_size();
635681
if max_size < (own_size + AVG_VALUE_SIZE) {
636682
// split_at must not be zero
637683
return SplitDecision::MoveToNextBatch;
@@ -644,27 +690,9 @@ fn split_at(bucket: &BucketView<'_>, max_size: usize, min_split_size: usize) ->
644690
// Perform a split with the remaining space after adding the key. We assume an average
645691
// length of 8 bytes per value and compute the number of items fitting into the left side.
646692
let split_at = (max_size - own_size) / AVG_VALUE_SIZE;
647-
648693
SplitDecision::Split(split_at)
649694
}
650695

651-
/// Estimates the number of bytes needed to serialize the bucket without value.
652-
///
653-
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
654-
/// approximated through tags and a static overhead.
655-
fn estimate_base_size(bucket: &BucketView<'_>) -> usize {
656-
50 + bucket.name().len() + aggregator::tags_cost(bucket.tags())
657-
}
658-
659-
/// Estimates the number of bytes needed to serialize the bucket.
660-
///
661-
/// Note that this does not match the exact size of the serialized payload. Instead, the size is
662-
/// approximated through the number of contained values, assuming an average size of serialized
663-
/// values.
664-
fn estimate_size(bucket: &BucketView<'_>) -> usize {
665-
estimate_base_size(bucket) + bucket.len() * AVG_VALUE_SIZE
666-
}
667-
668696
#[cfg(test)]
669697
mod tests {
670698
use insta::assert_json_snapshot;
@@ -919,7 +947,7 @@ b3:42:75|s"#;
919947
.by_size(100)
920948
.map(|bv| {
921949
let len: usize = bv.iter().map(|b| b.len()).sum();
922-
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
950+
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
923951
(len, size)
924952
})
925953
.collect::<Vec<_>>();
@@ -945,7 +973,7 @@ b3:42:75|s"#;
945973
.by_size(250)
946974
.map(|bv| {
947975
let len: usize = bv.iter().map(|b| b.len()).sum();
948-
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
976+
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
949977
(len, size)
950978
})
951979
.collect::<Vec<_>>();
@@ -971,7 +999,7 @@ b3:42:75|s"#;
971999
.by_size(500)
9721000
.map(|bv| {
9731001
let len: usize = bv.iter().map(|b| b.len()).sum();
974-
let size: usize = bv.iter().map(|b| estimate_size(&b)).sum();
1002+
let size: usize = bv.iter().map(|b| b.estimated_size()).sum();
9751003
(len, size)
9761004
})
9771005
.collect::<Vec<_>>();

relay-server/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ bytes = { version = "1.4.0" }
5050
chrono = { workspace = true, features = ["clock"] }
5151
data-encoding = "2.3.3"
5252
flate2 = "1.0.19"
53+
fnv = "1.0.7"
5354
futures = { workspace = true }
5455
hash32 = { workspace = true }
5556
hashbrown = { workspace = true }
@@ -95,7 +96,7 @@ rmp-serde = "1.1.1"
9596
rust-embed = { version = "8.0.0", optional = true }
9697
serde = { workspace = true }
9798
serde_json = { workspace = true }
98-
smallvec = { workspace = true, features = ["drain_filter"] }
99+
smallvec = { workspace = true, features = ["drain_filter"] }
99100
sqlx = { version = "0.7.0", features = [
100101
"macros",
101102
"migrate",

0 commit comments

Comments
 (0)