Skip to content

Commit ada3256

Browse files
ref(normalization): Remove StoreProcessor (#3097)
This PR removes the `StoreProcessor`, bringing us one step closer to removing the store normalizer entirely. The biggest difference is that normalization of default attributes is now in `event::normalize` and runs in all relays, not just processing. ### Design choices - There's a `StoreConfig` struct in the c-abi. Setting the configuration from librelay should happen with a single object and Relay uses 3 (2 for validation and 1 for normalization), so we can't get rid of it in the current setting. - The normalization config has a `is_last_normalize`. Relay needs to know whether it's the last normalization step to e.g. remove unknown fields not to break future compatibility. This flag is currently enabled in processing relays and librelay calls, and it's disabled by default. ### Future work - Move `normalize::mod` contents to their appropriate place. Currently, it contains a few types, a couple of functions used in other places, and tests validating the functionality in other modules. - Make running normalization configurable, and only run it in PoP relays.
1 parent 79fd7ca commit ada3256

File tree

21 files changed

+804
-924
lines changed

21 files changed

+804
-924
lines changed

relay-cabi/src/processing.rs

Lines changed: 140 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,144 @@ use std::os::raw::c_char;
99
use std::slice;
1010
use std::sync::OnceLock;
1111

12+
use chrono::{DateTime, Utc};
1213
use relay_common::glob::{glob_match_bytes, GlobOptions};
1314
use relay_dynamic_config::{normalize_json, validate_json, GlobalConfig, ProjectConfig};
1415
use relay_event_normalization::{
15-
normalize_event, validate_event_timestamps, validate_transaction, EventValidationConfig,
16-
GeoIpLookup, NormalizationConfig, RawUserAgentInfo, StoreConfig, StoreProcessor,
16+
normalize_event, validate_event_timestamps, validate_transaction, BreakdownsConfig,
17+
ClientHints, EventValidationConfig, GeoIpLookup, NormalizationConfig, RawUserAgentInfo,
1718
TransactionValidationConfig,
1819
};
1920
use relay_event_schema::processor::{process_value, split_chunks, ProcessingState};
20-
use relay_event_schema::protocol::{Event, VALID_PLATFORMS};
21+
use relay_event_schema::protocol::{Event, IpAddr, VALID_PLATFORMS};
2122
use relay_pii::{
2223
selector_suggestions_from_value, DataScrubbingConfig, InvalidSelectorError, PiiConfig,
2324
PiiConfigError, PiiProcessor, SelectorSpec,
2425
};
2526
use relay_protocol::{Annotated, Remark, RuleCondition};
2627
use relay_sampling::SamplingConfig;
28+
use serde::{Deserialize, Serialize};
29+
use uuid::Uuid;
2730

2831
use crate::core::{RelayBuf, RelayStr};
2932

33+
/// Configuration for the store step -- validation and normalization.
34+
#[derive(Serialize, Deserialize, Debug, Default)]
35+
#[serde(default)]
36+
pub struct StoreNormalizer {
37+
/// The identifier of the target project, which gets added to the payload.
38+
pub project_id: Option<u64>,
39+
40+
/// The IP address of the SDK that sent the event.
41+
///
42+
/// When `{{auto}}` is specified and there is no other IP address in the payload, such as in the
43+
/// `request` context, this IP address gets added to the `user` context.
44+
pub client_ip: Option<IpAddr>,
45+
46+
/// The name and version of the SDK that sent the event.
47+
pub client: Option<String>,
48+
49+
/// The internal identifier of the DSN, which gets added to the payload.
50+
///
51+
/// Note that this is different from the DSN's public key. The ID is usually numeric.
52+
pub key_id: Option<String>,
53+
54+
/// The version of the protocol.
55+
///
56+
/// This is a deprecated field, as there is no more versioning of Relay event payloads.
57+
pub protocol_version: Option<String>,
58+
59+
/// Configuration for issue grouping.
60+
///
61+
/// This configuration is persisted into the event payload to achieve idempotency in the
62+
/// processing pipeline and for reprocessing.
63+
pub grouping_config: Option<serde_json::Value>,
64+
65+
/// The raw user-agent string obtained from the submission request headers.
66+
///
67+
/// The user agent is used to infer device, operating system, and browser information should the
68+
/// event payload contain no such data.
69+
///
70+
/// Newer browsers have frozen their user agents and send [`client_hints`](Self::client_hints)
71+
/// instead. If both a user agent and client hints are present, normalization uses client hints.
72+
pub user_agent: Option<String>,
73+
74+
/// A collection of headers sent by newer browsers about the device and environment.
75+
///
76+
/// Client hints are the preferred way to infer device, operating system, and browser
77+
/// information should the event payload contain no such data. If no client hints are present,
78+
/// normalization falls back to the user agent.
79+
pub client_hints: ClientHints<String>,
80+
81+
/// The time at which the event was received in this Relay.
82+
///
83+
/// This timestamp is persisted into the event payload.
84+
pub received_at: Option<DateTime<Utc>>,
85+
86+
/// The time at which the event was sent by the client.
87+
///
88+
/// The difference between this and the `received_at` timestamps is used for clock drift
89+
/// correction, should a significant difference be detected.
90+
pub sent_at: Option<DateTime<Utc>>,
91+
92+
/// The maximum amount of seconds an event can be predated into the future.
93+
///
94+
/// If the event's timestamp lies further into the future, the received timestamp is assumed.
95+
pub max_secs_in_future: Option<i64>,
96+
97+
/// The maximum amount of seconds an event can be dated in the past.
98+
///
99+
/// If the event's timestamp is older, the received timestamp is assumed.
100+
pub max_secs_in_past: Option<i64>,
101+
102+
/// When `Some(true)`, individual parts of the event payload is trimmed to a maximum size.
103+
///
104+
/// See the event schema for size declarations.
105+
pub enable_trimming: Option<bool>,
106+
107+
/// When `Some(true)`, it is assumed that the event has been normalized before.
108+
///
109+
/// This disables certain normalizations, especially all that are not idempotent. The
110+
/// renormalize mode is intended for the use in the processing pipeline, so an event modified
111+
/// during ingestion can be validated against the schema and large data can be trimmed. However,
112+
/// advanced normalizations such as inferring contexts or clock drift correction are disabled.
113+
///
114+
/// `None` equals to `false`.
115+
pub is_renormalize: Option<bool>,
116+
117+
/// Overrides the default flag for other removal.
118+
pub remove_other: Option<bool>,
119+
120+
/// When `Some(true)`, context information is extracted from the user agent.
121+
pub normalize_user_agent: Option<bool>,
122+
123+
/// Emit breakdowns based on given configuration.
124+
pub breakdowns: Option<BreakdownsConfig>,
125+
126+
/// The SDK's sample rate as communicated via envelope headers.
127+
///
128+
/// It is persisted into the event payload.
129+
pub client_sample_rate: Option<f64>,
130+
131+
/// The identifier of the Replay running while this event was created.
132+
///
133+
/// It is persisted into the event payload for correlation.
134+
pub replay_id: Option<Uuid>,
135+
136+
/// Controls whether spans should be normalized (e.g. normalizing the exclusive time).
137+
///
138+
/// To normalize spans in [`normalize_event`], `is_renormalize` must
139+
/// be disabled _and_ `normalize_spans` enabled.
140+
pub normalize_spans: bool,
141+
}
142+
143+
impl StoreNormalizer {
144+
/// Helper method to parse *mut StoreConfig -> &StoreConfig
145+
fn this(&self) -> &Self {
146+
self
147+
}
148+
}
149+
30150
/// A geo ip lookup helper based on maxmind db files.
31151
pub struct RelayGeoIpLookup;
32152

@@ -80,15 +200,14 @@ pub unsafe extern "C" fn relay_valid_platforms(size_out: *mut usize) -> *const R
80200
platforms.as_ptr()
81201
}
82202

83-
/// Creates a new normalization processor.
203+
/// Creates a new normalization config.
84204
#[no_mangle]
85205
#[relay_ffi::catch_unwind]
86206
pub unsafe extern "C" fn relay_store_normalizer_new(
87207
config: *const RelayStr,
88208
_geoip_lookup: *const RelayGeoIpLookup,
89209
) -> *mut RelayStoreNormalizer {
90-
let config: StoreConfig = serde_json::from_str((*config).as_str())?;
91-
let normalizer = StoreProcessor::new(config);
210+
let normalizer: StoreNormalizer = serde_json::from_str((*config).as_str())?;
92211
Box::into_raw(Box::new(normalizer)) as *mut RelayStoreNormalizer
93212
}
94213

@@ -97,7 +216,7 @@ pub unsafe extern "C" fn relay_store_normalizer_new(
97216
#[relay_ffi::catch_unwind]
98217
pub unsafe extern "C" fn relay_store_normalizer_free(normalizer: *mut RelayStoreNormalizer) {
99218
if !normalizer.is_null() {
100-
let normalizer = normalizer as *mut StoreProcessor;
219+
let normalizer = normalizer as *mut StoreNormalizer;
101220
let _dropped = Box::from_raw(normalizer);
102221
}
103222
}
@@ -109,9 +228,9 @@ pub unsafe extern "C" fn relay_store_normalizer_normalize_event(
109228
normalizer: *mut RelayStoreNormalizer,
110229
event: *const RelayStr,
111230
) -> RelayStr {
112-
let processor = normalizer as *mut StoreProcessor;
231+
let normalizer = normalizer as *mut StoreNormalizer;
232+
let config = (*normalizer).this();
113233
let mut event = Annotated::<Event>::from_json((*event).as_str())?;
114-
let config = (*processor).config();
115234

116235
let event_validation_config = EventValidationConfig {
117236
received_at: config.received_at,
@@ -127,8 +246,16 @@ pub unsafe extern "C" fn relay_store_normalizer_normalize_event(
127246
};
128247
validate_transaction(&mut event, &tx_validation_config)?;
129248

249+
let is_renormalize = config.is_renormalize.unwrap_or(false);
250+
130251
let normalization_config = NormalizationConfig {
252+
project_id: config.project_id,
253+
client: config.client.clone(),
254+
protocol_version: config.protocol_version.clone(),
255+
key_id: config.key_id.clone(),
256+
grouping_config: config.grouping_config.clone(),
131257
client_ip: config.client_ip.as_ref(),
258+
client_sample_rate: config.client_sample_rate,
132259
user_agent: RawUserAgentInfo {
133260
user_agent: config.user_agent.as_deref(),
134261
client_hints: config.client_hints.as_deref(),
@@ -137,7 +264,9 @@ pub unsafe extern "C" fn relay_store_normalizer_normalize_event(
137264
breakdowns_config: None, // only supported in relay
138265
normalize_user_agent: config.normalize_user_agent,
139266
transaction_name_config: Default::default(), // only supported in relay
140-
is_renormalize: config.is_renormalize.unwrap_or(false),
267+
is_renormalize,
268+
remove_other: config.remove_other.unwrap_or(!is_renormalize),
269+
emit_event_errors: !is_renormalize,
141270
device_class_synthesis_config: false, // only supported in relay
142271
enrich_spans: false,
143272
max_tag_value_length: usize::MAX,
@@ -147,10 +276,10 @@ pub unsafe extern "C" fn relay_store_normalizer_normalize_event(
147276
enable_trimming: config.enable_trimming.unwrap_or_default(),
148277
measurements: None,
149278
normalize_spans: config.normalize_spans,
279+
replay_id: config.replay_id,
150280
};
151281
normalize_event(&mut event, &normalization_config);
152282

153-
process_value(&mut event, &mut *processor, ProcessingState::root())?;
154283
RelayStr::from_string(event.to_json()?)
155284
}
156285

0 commit comments

Comments
 (0)