diff --git a/relay-event-normalization/src/eap/mod.rs b/relay-event-normalization/src/eap/mod.rs index aef5a0568b..f0ccab4c03 100644 --- a/relay-event-normalization/src/eap/mod.rs +++ b/relay-event-normalization/src/eap/mod.rs @@ -21,6 +21,8 @@ use crate::{ClientHints, FromUserAgentInfo as _, RawUserAgentInfo}; mod ai; mod size; +#[allow(unused)] +mod trimming; pub use self::ai::normalize_ai; pub use self::size::*; diff --git a/relay-event-normalization/src/eap/trimming.rs b/relay-event-normalization/src/eap/trimming.rs new file mode 100644 index 0000000000..44ed25ca84 --- /dev/null +++ b/relay-event-normalization/src/eap/trimming.rs @@ -0,0 +1,513 @@ +use relay_event_schema::processor::{ + self, ProcessValue, ProcessingAction, ProcessingResult, ProcessingState, Processor, ValueType, +}; +use relay_event_schema::protocol::Attributes; +use relay_protocol::{Array, Empty, Meta, Object}; + +use crate::eap::size; + +#[derive(Clone, Debug)] +struct SizeState { + max_depth: Option, + encountered_at_depth: usize, + size_remaining: Option, +} + +/// Processor for trimming EAP items (logs, V2 spans). +/// +/// This primarily differs from the regular [`TrimmingProcessor`](crate::trimming::TrimmingProcessor) +/// in the handling of [`Attributes`]. This processor handles attributes as follows: +/// 1. Sort them by combined key and value size, where the key size is just the string length +/// and the value size is given by [`size::attribute_size`]. +/// 2. Trim attributes one by one. Key lengths are counted for the attribute's size, but keys +/// aren't trimmed—if a key is too long, the attribute is simply discarded. +/// 3. If we run out of space, all subsequent attributes are discarded. +/// +/// This means that large attributes will be trimmed or discarded before small ones. +#[derive(Default)] +pub struct TrimmingProcessor { + size_state: Vec, + /// Whether we are currently trimming a collection of attributes. + /// This case needs to be distinguished for the purpose of accounting + /// for string lengths. + in_attributes: bool, +} + +impl TrimmingProcessor { + /// Creates a new trimming processor. + pub fn new() -> Self { + Self::default() + } + + fn should_remove_container(&self, value: &T, state: &ProcessingState<'_>) -> bool { + // Heuristic to avoid trimming a value like `[1, 1, 1, 1, ...]` into `[null, null, null, + // null, ...]`, making it take up more space. + self.remaining_depth(state) == Some(1) && !value.is_empty() + } + + #[inline] + fn remaining_size(&self) -> Option { + self.size_state + .iter() + .filter_map(|x| x.size_remaining) + .min() + } + + #[inline] + fn remaining_depth(&self, state: &ProcessingState<'_>) -> Option { + self.size_state + .iter() + .filter_map(|size_state| { + // The current depth in the entire event payload minus the depth at which we found the + // max_depth attribute is the depth where we are at in the property. + let current_depth = state.depth() - size_state.encountered_at_depth; + size_state + .max_depth + .map(|max_depth| max_depth.saturating_sub(current_depth)) + }) + .min() + } + + fn consume_size(&mut self, size: usize) { + for remaining in self + .size_state + .iter_mut() + .filter_map(|state| state.size_remaining.as_mut()) + { + *remaining = remaining.saturating_sub(size); + } + } +} + +impl Processor for TrimmingProcessor { + fn before_process( + &mut self, + _: Option<&T>, + _: &mut Meta, + state: &ProcessingState<'_>, + ) -> ProcessingResult { + // If we encounter a max_bytes or max_depth attribute it + // resets the size and depth that is permitted below it. + if state.attrs().max_bytes.is_some() || state.attrs().max_depth.is_some() { + self.size_state.push(SizeState { + size_remaining: state.attrs().max_bytes, + encountered_at_depth: state.depth(), + max_depth: state.attrs().max_depth, + }); + } + + if state.attrs().trim { + if self.remaining_size() == Some(0) { + return Err(ProcessingAction::DeleteValueHard); + } + if self.remaining_depth(state) == Some(0) { + return Err(ProcessingAction::DeleteValueHard); + } + } + Ok(()) + } + + fn after_process( + &mut self, + _value: Option<&T>, + _: &mut Meta, + state: &ProcessingState<'_>, + ) -> ProcessingResult { + // If our current depth is the one where we found a bag_size attribute, this means we + // are done processing a databag. Pop the bag size state. + self.size_state + .pop_if(|size_state| state.depth() == size_state.encountered_at_depth); + + // The general `TrimmingProcessor` counts consumed sizes at this point. We can't do this generically + // because we want to count sizes using `size::attribute_size` for attribute values. Therefore, the + // size accounting needs to happen in the processing functions themselves. + + Ok(()) + } + + fn process_string( + &mut self, + value: &mut String, + meta: &mut Meta, + state: &ProcessingState<'_>, + ) -> ProcessingResult { + if let Some(max_chars) = state.attrs().max_chars { + crate::trimming::trim_string(value, meta, max_chars, state.attrs().max_chars_allowance); + } + + if !state.attrs().trim { + return Ok(()); + } + + if let Some(size_state) = self.size_state.last() + && let Some(size_remaining) = size_state.size_remaining + { + crate::trimming::trim_string(value, meta, size_remaining, 0); + } + + // Only count string size here if we're _not_ currently trimming attributes. + // In that case, the size accounting is already handled by `process_attributes`. + if !self.in_attributes { + self.consume_size(value.len()); + } + + Ok(()) + } + + fn process_array( + &mut self, + value: &mut Array, + meta: &mut Meta, + state: &ProcessingState<'_>, + ) -> ProcessingResult + where + T: ProcessValue, + { + if !state.attrs().trim { + return Ok(()); + } + + // If we need to check the bag size, then we go down a different path + if !self.size_state.is_empty() { + let original_length = value.len(); + + if self.should_remove_container(value, state) { + return Err(ProcessingAction::DeleteValueHard); + } + + let mut split_index = None; + for (index, item) in value.iter_mut().enumerate() { + if self.remaining_size() == Some(0) { + split_index = Some(index); + break; + } + + let item_state = state.enter_index(index, None, ValueType::for_field(item)); + processor::process_value(item, self, &item_state)?; + } + + if let Some(split_index) = split_index { + let _ = value.split_off(split_index); + } + + if value.len() != original_length { + meta.set_original_length(Some(original_length)); + } + } else { + value.process_child_values(self, state)?; + } + + Ok(()) + } + + fn process_object( + &mut self, + value: &mut Object, + meta: &mut Meta, + state: &ProcessingState<'_>, + ) -> ProcessingResult + where + T: ProcessValue, + { + if !state.attrs().trim { + return Ok(()); + } + + // If we need to check the bag size, then we go down a different path + if !self.size_state.is_empty() { + let original_length = value.len(); + + if self.should_remove_container(value, state) { + return Err(ProcessingAction::DeleteValueHard); + } + + let mut split_key = None; + for (key, item) in value.iter_mut() { + if self.remaining_size() == Some(0) { + split_key = Some(key.to_owned()); + break; + } + + let item_state = state.enter_borrowed(key, None, ValueType::for_field(item)); + processor::process_value(item, self, &item_state)?; + } + + if let Some(split_key) = split_key { + let _ = value.split_off(&split_key); + } + + if value.len() != original_length { + meta.set_original_length(Some(original_length)); + } + } else { + value.process_child_values(self, state)?; + } + + Ok(()) + } + + fn process_attributes( + &mut self, + attributes: &mut Attributes, + meta: &mut Meta, + state: &ProcessingState, + ) -> ProcessingResult { + if !state.attrs().trim { + return Ok(()); + } + + // Mark `self.in_attributes` so we don't double-count string sizes + self.in_attributes = true; + + let original_length = size::attributes_size(attributes); + + // Sort attributes by key + value size so small attributes are more likely to be preserved + let inner = std::mem::take(&mut attributes.0); + let mut sorted: Vec<_> = inner.into_iter().collect(); + sorted.sort_by_cached_key(|(k, v)| k.len() + size::attribute_size(v)); + + let mut split_idx = None; + for (idx, (key, value)) in sorted.iter_mut().enumerate() { + self.consume_size(key.len()); + + if self.remaining_size() == Some(0) { + split_idx = Some(idx); + break; + } + + let value_state = state.enter_borrowed(key, None, ValueType::for_field(value)); + processor::process_value(value, self, &value_state).inspect_err(|_| { + self.in_attributes = false; + })?; + self.consume_size(size::attribute_size(value)); + } + + if let Some(split_idx) = split_idx { + let _ = sorted.split_off(split_idx); + } + + attributes.0 = sorted.into_iter().collect(); + + let new_size = size::attributes_size(attributes); + if new_size != original_length { + meta.set_original_length(Some(original_length)); + } + + self.in_attributes = false; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use relay_protocol::{Annotated, FromValue, IntoValue, SerializableAnnotated}; + + use super::*; + + #[derive(Debug, Clone, Empty, IntoValue, FromValue, ProcessValue)] + struct TestObject { + #[metastructure(max_bytes = 40, trim = true)] + attributes: Annotated, + #[metastructure(max_chars = 10, trim = true)] + body: Annotated, + } + + #[test] + fn test_split_on_string() { + let mut attributes = Attributes::new(); + + attributes.insert("small", 17); // 13B + attributes.insert("medium string", "This string should be trimmed"); // 42B + attributes.insert("attribute is very large and should be removed", true); // 47B + + let mut value = Annotated::new(TestObject { + attributes: Annotated::new(attributes), + body: Annotated::new("This is longer than allowed".to_owned()), + }); + + let mut processor = TrimmingProcessor::new(); + + let state = ProcessingState::new_root(Default::default(), []); + processor::process_value(&mut value, &mut processor, &state).unwrap(); + + insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###" + { + "attributes": { + "medium string": { + "type": "string", + "value": "This string..." + }, + "small": { + "type": "integer", + "value": 17 + } + }, + "body": "This is...", + "_meta": { + "attributes": { + "": { + "len": 101 + }, + "medium string": { + "value": { + "": { + "rem": [ + [ + "!limit", + "s", + 11, + 14 + ] + ], + "len": 29 + } + } + } + }, + "body": { + "": { + "rem": [ + [ + "!limit", + "s", + 7, + 10 + ] + ], + "len": 27 + } + } + } + } + "###); + } + + #[test] + fn test_one_byte_left() { + let mut attributes = Attributes::new(); + + // First attribute + key of second attribute is 39B, leaving exactly one + // byte for the second attribute's value. + attributes.insert("small attribute", 17); // 23B + attributes.insert("medium attribute", "This string should be trimmed"); // 45B + + let mut value = Annotated::new(TestObject { + attributes: Annotated::new(attributes), + body: Annotated::new("This is longer than allowed".to_owned()), + }); + + let mut processor = TrimmingProcessor::new(); + + let state = ProcessingState::new_root(Default::default(), []); + processor::process_value(&mut value, &mut processor, &state).unwrap(); + + insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###" + { + "attributes": { + "medium attribute": { + "type": "string", + "value": "..." + }, + "small attribute": { + "type": "integer", + "value": 17 + } + }, + "body": "This is...", + "_meta": { + "attributes": { + "": { + "len": 68 + }, + "medium attribute": { + "value": { + "": { + "rem": [ + [ + "!limit", + "s", + 0, + 3 + ] + ], + "len": 29 + } + } + } + }, + "body": { + "": { + "rem": [ + [ + "!limit", + "s", + 7, + 10 + ] + ], + "len": 27 + } + } + } + } + "###); + } + + #[test] + fn test_overaccept_number() { + let mut attributes = Attributes::new(); + + // The attribute size would get used up by the value of "attribute with long name". + // Nevertheless, we accept this attribute, thereby overaccepting 5B. + attributes.insert("small", "abcdefgh"); // 5 + 8 = 13B + attributes.insert("attribute with long name", 71); // 24 + 8 = 32B + attributes.insert("attribute is very large and should be removed", true); // 46 + 1 = 47B + + let mut value = Annotated::new(TestObject { + attributes: Annotated::new(attributes), + body: Annotated::new("This is longer than allowed".to_owned()), + }); + + let mut processor = TrimmingProcessor::new(); + + let state = ProcessingState::new_root(Default::default(), []); + processor::process_value(&mut value, &mut processor, &state).unwrap(); + + insta::assert_json_snapshot!(SerializableAnnotated(&value), @r###" + { + "attributes": { + "attribute with long name": { + "type": "integer", + "value": 71 + }, + "small": { + "type": "string", + "value": "abcdefgh" + } + }, + "body": "This is...", + "_meta": { + "attributes": { + "": { + "len": 91 + } + }, + "body": { + "": { + "rem": [ + [ + "!limit", + "s", + 7, + 10 + ] + ], + "len": 27 + } + } + } + } + "###); + } +} diff --git a/relay-event-normalization/src/trimming.rs b/relay-event-normalization/src/trimming.rs index fa304133f7..130faa5774 100644 --- a/relay-event-normalization/src/trimming.rs +++ b/relay-event-normalization/src/trimming.rs @@ -299,7 +299,12 @@ impl Processor for TrimmingProcessor { } /// Trims the string to the given maximum length and updates meta data. -fn trim_string(value: &mut String, meta: &mut Meta, max_chars: usize, max_chars_allowance: usize) { +pub(crate) fn trim_string( + value: &mut String, + meta: &mut Meta, + max_chars: usize, + max_chars_allowance: usize, +) { let hard_limit = max_chars + max_chars_allowance; if bytecount::num_chars(value.as_bytes()) <= hard_limit { diff --git a/relay-event-schema/src/protocol/attributes.rs b/relay-event-schema/src/protocol/attributes.rs index a277f315b9..db92a35f0a 100644 --- a/relay-event-schema/src/protocol/attributes.rs +++ b/relay-event-schema/src/protocol/attributes.rs @@ -16,7 +16,7 @@ pub struct Attribute { pub value: AttributeValue, /// Additional arbitrary fields for forwards compatibility. - #[metastructure(additional_properties)] + #[metastructure(additional_properties, trim = false)] pub other: Object, } @@ -92,6 +92,21 @@ impl_from!(i64, AttributeType::Integer); impl_from!(f64, AttributeType::Double); impl_from!(bool, AttributeType::Boolean); +impl From> for AttributeValue { + fn from(value: Annotated<&str>) -> Self { + Self { + ty: Annotated::new(AttributeType::String), + value: value.map_value(Into::into), + } + } +} + +impl From<&str> for AttributeValue { + fn from(value: &str) -> Self { + Self::from(Annotated::new(value)) + } +} + /// Determines the `Pii` value for an attribute (or, more exactly, the /// attribute's `value` field) by looking it up in `relay-conventions`. /// @@ -303,6 +318,9 @@ impl From<[(String, Annotated); N]> for Attributes { } } +// We need to manually implement `ProcessValue` for `Attributes`. +// Deriving it, even with `process_func`, causes both `process_value` +// and `process_child_values` to be called because it's a newtype struct. impl ProcessValue for Attributes { #[inline] fn value_type(&self) -> EnumSet {