diff --git a/libdd-library-config/src/tracer_metadata.rs b/libdd-library-config/src/tracer_metadata.rs index 130a347d98..321b4a50a9 100644 --- a/libdd-library-config/src/tracer_metadata.rs +++ b/libdd-library-config/src/tracer_metadata.rs @@ -34,15 +34,20 @@ pub struct TracerMetadata { #[serde(skip_serializing_if = "Option::is_none")] pub container_id: Option, /// Ordered list of attribute key names for thread-level context records. Key indices from - /// thread context records index into this table. Keep empty if thread-level context is not - /// used. + /// thread context records index into this table. Set to `None` to disable thread-level related + /// attributes to the process-level context. + /// + /// If set to `Some`, the first key will be automatically set to `datadog.local_root_span_id` + /// in the OTel process context, because the thread context handling elsewhere in libdatadog + /// relies on this key's index to be zero. Only set additional keys in + /// `threadlocal_attribute_keys`; the root span id is considered to always be here implicitly. /// /// This field is specific to OTel process context. It is ignored for (de)serialization, and is /// only used when converting to an OTel process context in /// [TracerMetadata::to_otel_process_ctx]. #[cfg(feature = "otel-thread-ctx")] #[serde(skip)] - pub threadlocal_attribute_keys: Vec, + pub threadlocal_attribute_keys: Option>, } impl Default for TracerMetadata { @@ -59,7 +64,7 @@ impl Default for TracerMetadata { process_tags: None, container_id: None, #[cfg(feature = "otel-thread-ctx")] - threadlocal_attribute_keys: vec![], + threadlocal_attribute_keys: None, } } } @@ -124,21 +129,25 @@ impl TracerMetadata { ]; #[cfg(feature = "otel-thread-ctx")] - if !threadlocal_attribute_keys.is_empty() { + if let Some(threadlocal_attribute_keys) = threadlocal_attribute_keys.as_ref() { attributes.push(key_value( "threadlocal.schema_version", "tlsdesc_v1_dev".to_owned(), )); + attributes.push(KeyValue { key: "threadlocal.attribute_key_map".to_owned(), value: Some(AnyValue { value: Some(any_value::Value::ArrayValue(ArrayValue { - values: threadlocal_attribute_keys - .iter() - .map(|k| AnyValue { - value: Some(any_value::Value::StringValue(k.clone())), - }) - .collect(), + values: std::iter::once(AnyValue { + value: Some(any_value::Value::StringValue( + "datadog.local_root_span_id".to_owned(), + )), + }) + .chain(threadlocal_attribute_keys.iter().map(|k| AnyValue { + value: Some(any_value::Value::StringValue(k.clone())), + })) + .collect(), })), }), key_ref: 0, @@ -250,11 +259,11 @@ mod tests { #[test] fn threadlocal_attrs_present_with_correct_values() { let ctx = TracerMetadata { - threadlocal_attribute_keys: vec![ + threadlocal_attribute_keys: Some(vec![ "span.id".to_owned(), "trace.id".to_owned(), "custom.key".to_owned(), - ], + ]), ..Default::default() } .to_otel_process_ctx(); @@ -282,6 +291,14 @@ mod tests { other => panic!("expected StringValue, got {:?}", other), }) .collect(); - assert_eq!(keys, ["span.id", "trace.id", "custom.key"]); + assert_eq!( + keys, + [ + "datadog.local_root_span_id", + "span.id", + "trace.id", + "custom.key" + ] + ); } } diff --git a/libdd-profiling/src/otel_thread_ctx.rs b/libdd-profiling/src/otel_thread_ctx.rs index 3ef14e42e2..16e09478b1 100644 --- a/libdd-profiling/src/otel_thread_ctx.rs +++ b/libdd-profiling/src/otel_thread_ctx.rs @@ -107,6 +107,10 @@ pub mod linux { } } + // We maintain the convention in libdatadog that the `local_root_span_id` attribute key is + // always the very first in the string table, so its key index is guaranteed to be zero. + const ROOT_SPAN_KEY_INDEX: u8 = 0; + /// Maximum size in bytes of the `attrs_data` field. /// /// Chosen so that the total record size (`28 + MAX_ATTRS_DATA_SIZE`) stays within the 640-byte @@ -165,8 +169,15 @@ pub mod linux { } impl ThreadContextRecord { - /// Build a record with the given trace id, span id and attributes. - pub fn new(trace_id: [u8; 16], span_id: [u8; 8], attrs: &[(u8, &str)]) -> Self { + /// Build a record with the given trace id, span id and attributes. The + /// `local_root_span_id` is a distinguished attribute with special handling for + /// convenience, but it ends up as other attributes in `attrs_data`. + fn new( + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) -> Self { const { assert!(size_of::() == 640) } let mut record = Self { @@ -174,7 +185,7 @@ pub mod linux { span_id, ..Default::default() }; - record.set_attrs(attrs); + record.set_attrs(local_root_span_id, attrs); record } @@ -199,10 +210,26 @@ pub mod linux { /// recovery would require us to be able to rollback to the previous attributes which would /// hurt the happy path, or leave the record in a inconsistent state. Another possibility /// would be to error out and reset the record in that situation. - pub fn set_attrs(&mut self, attributes: &[(u8, &str)]) -> bool { - let mut offset = 0; + fn set_attrs(&mut self, local_root_span_id: [u8; 8], attributes: &[(u8, &str)]) -> bool { let mut fully_encoded = true; + const { assert!(MAX_ATTRS_DATA_SIZE >= 18) } + // The local root span id is provided as raw bytes (can be seen as a big-endian u64), + // but readers will expect a string hex representation. We convert it to a fixed + // 16-characters string in the usual lowercase hex format. + // + // There's currently no easy way to use Rust format capabilities to write directly in a + // fixed-size array. Since the conversion is simple, we do it manually. + const HEX_DIGITS: &[u8; 16] = b"0123456789abcdef"; + self.attrs_data[0] = ROOT_SPAN_KEY_INDEX; + self.attrs_data[1] = 16; + for (i, &byte) in local_root_span_id.iter().enumerate() { + self.attrs_data[2 + i * 2] = HEX_DIGITS[(byte >> 4) as usize]; + self.attrs_data[2 + i * 2 + 1] = HEX_DIGITS[(byte & 0xF) as usize]; + } + + let mut offset = 18; + for &(key_index, val) in attributes { let val_bytes = val.as_bytes(); let val_len = val_bytes.len(); @@ -261,8 +288,18 @@ pub mod linux { impl ThreadContext { /// Create a new thread context with the given trace/span IDs and encoded attributes. - pub fn new(trace_id: [u8; 16], span_id: [u8; 8], attrs: &[(u8, &str)]) -> Self { - Self::from(ThreadContextRecord::new(trace_id, span_id, attrs)) + pub fn new( + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) -> Self { + Self::from(ThreadContextRecord::new( + trace_id, + span_id, + local_root_span_id, + attrs, + )) } /// Turn this thread context into a raw pointer to the underlying [ThreadContextRecord]. @@ -319,7 +356,7 @@ pub mod linux { /// also observes `valid = 1`. pub fn attach(self) -> Option { // [^tls-slot-ordering]: since we get back the previous context, we should in principle - // use an `Acquire` compiler fence to make sure we don't get back a not-yet-initiliazed + // use an `Acquire` compiler fence to make sure we don't get back a not-yet-initialized // record. // // However, this thread (excluding the reader signal handler) is the only one to ever @@ -335,7 +372,12 @@ pub mod linux { /// /// If there's currently no attached context, `update` will create one, and is in this case /// equivalent to `ThreadContext::new(trace_id, span_id, attrs).attach()`. - pub fn update(trace_id: [u8; 16], span_id: [u8; 8], attrs: &[(u8, &str)]) { + pub fn update( + trace_id: [u8; 16], + span_id: [u8; 8], + local_root_span_id: [u8; 8], + attrs: &[(u8, &str)], + ) { let slot = get_tls_slot(); if let Some(current) = unsafe { slot.load(Ordering::Relaxed).as_mut() } { @@ -344,7 +386,7 @@ pub mod linux { current.trace_id = trace_id; current.span_id = span_id; - current.set_attrs(attrs); + current.set_attrs(local_root_span_id, attrs); compiler_fence(Ordering::SeqCst); current.valid.store(1, Ordering::Relaxed); @@ -354,7 +396,7 @@ pub mod linux { // `ThreadContext::new` already initialises `valid = 1`. let _ = Self::swap( slot, - ThreadContext::new(trace_id, span_id, attrs).into_raw(), + ThreadContext::new(trace_id, span_id, local_root_span_id, attrs).into_raw(), ); } } @@ -370,7 +412,7 @@ pub mod linux { impl Drop for ThreadContext { fn drop(&mut self) { // Safety: `self.0` was obtained from a `Box::new`, and `ThreadContext` represents - // ownership of the underyling memory. + // ownership of the underlying memory. unsafe { let _ = Box::from_raw(self.0.as_ptr()); } @@ -394,12 +436,13 @@ pub mod linux { fn tls_lifecycle_basic() { let trace_id = [1u8; 16]; let span_id = [2u8; 8]; + let root_span_id = [3u8; 8]; assert!( read_tls_context_ptr().is_null(), "TLS must be null initially" ); - ThreadContext::new(trace_id, span_id, &[]).attach(); + ThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); assert!( !read_tls_context_ptr().is_null(), "TLS must not be null after attach" @@ -429,8 +472,9 @@ pub mod linux { fn raw_tls_pointer_read() { let trace_id = [1u8; 16]; let span_id = [2u8; 8]; + let root_span_id = [3u8; 8]; - ThreadContext::new(trace_id, span_id, &[]).attach(); + ThreadContext::new(trace_id, span_id, root_span_id, &[]).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null(), "TLS must be non-null after attach"); @@ -440,7 +484,8 @@ pub mod linux { assert_eq!(record.trace_id, trace_id); assert_eq!(record.span_id, span_id); assert_eq!(record.valid.load(Ordering::Relaxed), 1); - assert_eq!(record.attrs_data_size, 0); + // 1 (key) + 1 (len) + 16 (root_span_id hex chars) = 18 + assert_eq!(record.attrs_data_size, 18); let _ = ThreadContext::detach(); } @@ -448,20 +493,24 @@ pub mod linux { #[test] #[cfg_attr(miri, ignore)] fn attribute_encoding_basic() { - let attrs: &[(u8, &str)] = &[(0, "GET"), (1, "/api/v1")]; - ThreadContext::new([0u8; 16], [0u8; 8], attrs).attach(); + let attrs: &[(u8, &str)] = &[(1, "GET"), (2, "/api/v1")]; + ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); let record = unsafe { &*ptr }; - let expected_size: u16 = (2 + 3 + 2 + 7) as u16; + // 1+1+16 (root_span_id hex) + 1+1+3 (GET) + 1+1+7 (/api/v1) + let expected_size: u16 = (2 + 16 + 2 + 3 + 2 + 7) as u16; assert_eq!(record.attrs_data_size, expected_size); assert_eq!(record.attrs_data[0], 0); - assert_eq!(record.attrs_data[1], 3); - assert_eq!(&record.attrs_data[2..5], b"GET"); - assert_eq!(record.attrs_data[5], 1); - assert_eq!(record.attrs_data[6], 7); - assert_eq!(&record.attrs_data[7..14], b"/api/v1"); + assert_eq!(record.attrs_data[1], 16); + assert_eq!(&record.attrs_data[2..18], b"0000000000000000"); + assert_eq!(record.attrs_data[18], 1); + assert_eq!(record.attrs_data[19], 3); + assert_eq!(&record.attrs_data[20..23], b"GET"); + assert_eq!(record.attrs_data[23], 2); + assert_eq!(record.attrs_data[24], 7); + assert_eq!(&record.attrs_data[25..32], b"/api/v1"); let _ = ThreadContext::detach(); } @@ -471,29 +520,31 @@ pub mod linux { fn attribute_truncation_on_overflow() { // Build attributes whose combined encoded size exceeds MAX_ATTRS_DATA_SIZE. // Each max entry: 1 (key) + 1 (len) + 255 (val) = 257 bytes. - // Two such entries: 514 bytes. A third entry of 100 chars would need 102 bytes, - // bringing the total to 616 > 612, so the third entry must be dropped. + // root_span_id: 1 (key) + 1 (len) + 16 (hex val) = 18 bytes. + // Two such entries: 514 bytes, plus root_span_id: 532. + // A third entry of 100 chars would need 102 bytes, bringing the total to 634 > 612, so + // the third entry must be dropped. let val_a = "a".repeat(255); // 257 bytes encoded let val_b = "b".repeat(255); // 257 bytes encoded → 514 total - let val_c = "c".repeat(100); // 102 bytes encoded → 616 total: must be dropped + let val_c = "c".repeat(100); // 102 bytes encoded → 626 total: must be dropped let attrs: &[(u8, &str)] = &[ - (0, val_a.as_str()), - (1, val_b.as_str()), - (2, val_c.as_str()), + (1, val_a.as_str()), + (2, val_b.as_str()), + (3, val_c.as_str()), ]; - ThreadContext::new([0u8; 16], [0u8; 8], attrs).attach(); + ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], attrs).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); let record = unsafe { &*ptr }; - // Only the first two entries fit (514 bytes). - assert_eq!(record.attrs_data_size, 514); - assert_eq!(record.attrs_data[0], 0); - assert_eq!(record.attrs_data[1], 255); - assert_eq!(record.attrs_data[257], 1); - assert_eq!(record.attrs_data[258], 255); + // Only the first two entries fit (514 bytes + 18 bytes for root_span_id). + assert_eq!(record.attrs_data_size, 532); + assert_eq!(record.attrs_data[18], 1); + assert_eq!(record.attrs_data[19], 255); + assert_eq!(record.attrs_data[275], 2); + assert_eq!(record.attrs_data[276], 255); let _ = ThreadContext::detach(); } @@ -501,25 +552,30 @@ pub mod linux { #[test] #[cfg_attr(miri, ignore)] fn update_record_in_place() { - let trace_id_1 = [1u8; 16]; - let span_id_1 = [1u8; 8]; - let trace_id_2 = [2u8; 16]; - let span_id_2 = [2u8; 8]; + let trace_id1 = [1u8; 16]; + let span_id1 = [0x01, 0x12, 0x23, 0x34, 0x45, 0x56, 0x67, 0x78]; + let root_span_id1 = [0x78, 0x79, 0x7A, 0x7B, 0x7C, 0x7D, 0x7E, 0x7F]; + let trace_id2 = [2u8; 16]; + let span_id2 = [0x0A, 0x1B, 0x2C, 0x3D, 0x4E, 0x5F, 0x6A, 0x7B]; + let root_span_id2 = [0x79, 0x7A, 0x7B, 0x7C, 0x7D, 0x7E, 0x7F, 0x80]; // Updating before any context is attached should be equivalent to `attach()` - ThreadContext::update(trace_id_1, span_id_1, &[(0, "v1")]); + ThreadContext::update(trace_id1, span_id1, root_span_id1, &[(0, "v1")]); let ptr_before = read_tls_context_ptr(); assert!(!ptr_before.is_null()); let record = unsafe { &*ptr_before }; - assert_eq!(record.trace_id, trace_id_1); - assert_eq!(record.span_id, span_id_1); + assert_eq!(record.trace_id, trace_id1); + assert_eq!(record.span_id, span_id1); assert_eq!(record.valid.load(Ordering::Relaxed), 1); assert_eq!(record.attrs_data[0], 0); - assert_eq!(record.attrs_data[1], 2); - assert_eq!(&record.attrs_data[2..4], b"v1"); + assert_eq!(record.attrs_data[1], 16); + assert_eq!(&record.attrs_data[2..18], b"78797a7b7c7d7e7f"); + assert_eq!(record.attrs_data[18], 0); + assert_eq!(record.attrs_data[19], 2); + assert_eq!(&record.attrs_data[20..22], b"v1"); - ThreadContext::update(trace_id_2, span_id_2, &[(0, "v2")]); + ThreadContext::update(trace_id2, span_id2, root_span_id2, &[(0, "v2")]); let ptr_after = read_tls_context_ptr(); assert_eq!( @@ -528,12 +584,15 @@ pub mod linux { ); let record = unsafe { &*ptr_after }; - assert_eq!(record.trace_id, trace_id_2); - assert_eq!(record.span_id, span_id_2); + assert_eq!(record.trace_id, trace_id2); + assert_eq!(record.span_id, span_id2); assert_eq!(record.valid.load(Ordering::Relaxed), 1); assert_eq!(record.attrs_data[0], 0); - assert_eq!(record.attrs_data[1], 2); - assert_eq!(&record.attrs_data[2..4], b"v2"); + assert_eq!(record.attrs_data[1], 16); + assert_eq!(&record.attrs_data[2..18], b"797a7b7c7d7e7f80"); + assert_eq!(record.attrs_data[18], 0); + assert_eq!(record.attrs_data[19], 2); + assert_eq!(&record.attrs_data[20..22], b"v2"); let _ = ThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); @@ -542,7 +601,7 @@ pub mod linux { #[test] #[cfg_attr(miri, ignore)] fn explicit_detach_nulls_tls() { - ThreadContext::new([3u8; 16], [3u8; 8], &[]).attach(); + ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[]).attach(); assert!(!read_tls_context_ptr().is_null()); let _ = ThreadContext::detach(); @@ -557,14 +616,16 @@ pub mod linux { #[cfg_attr(miri, ignore)] fn long_value_capped_at_255_bytes() { let long_val = "a".repeat(300); - ThreadContext::new([0u8; 16], [0u8; 8], &[(0, long_val.as_str())]).attach(); + ThreadContext::new([0u8; 16], [0u8; 8], [0u8; 8], &[(0, long_val.as_str())]).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null()); let record = unsafe { &*ptr }; - let val_len = record.attrs_data[1] as usize; + // root_span_id occupies offset 0..18, then the attr entry starts at 18: key at [18], + // len at [19] + let val_len = record.attrs_data[2 + 16 + 1]; assert_eq!(val_len, 255, "value must be capped at 255 bytes"); - assert_eq!(record.attrs_data_size as usize, 2 + 255); + assert_eq!(record.attrs_data_size, 2 + 16 + 2 + 255); let _ = ThreadContext::detach(); } @@ -579,12 +640,15 @@ pub mod linux { let b = barrier.clone(); let spawned_trace_id = [0xABu8; 16]; - let spawned_span_id = [0xCDu8; 8]; + let spawned_span_id = [0xCD, 0xBC, 0xAB, 0x9A, 0x89, 0x78, 0x67, 0x56]; + let spawned_root_span_id = [0xEF, 0xDE, 0xCD, 0xBC, 0xAB, 0x9A, 0x89, 0x78]; let main_trace_id = [0x11u8; 16]; - let main_span_id = [0x22u8; 8]; + let main_span_id = [0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99]; + let main_root_span_id = [0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xAA]; let handle = std::thread::spawn(move || { - ThreadContext::new(spawned_trace_id, spawned_span_id, &[]).attach(); + ThreadContext::new(spawned_trace_id, spawned_span_id, spawned_root_span_id, &[]) + .attach(); // Let the main thread attach its own record and verify its slot. b.wait(); @@ -597,6 +661,7 @@ pub mod linux { let record = unsafe { &*ptr }; assert_eq!(record.trace_id, spawned_trace_id); assert_eq!(record.span_id, spawned_span_id); + assert_eq!(&record.attrs_data[2..18], b"efdecdbcab9a8978"); let _ = ThreadContext::detach(); assert!(read_tls_context_ptr().is_null()); @@ -610,13 +675,14 @@ pub mod linux { "main thread should see a null pointer and not another thread's context" ); - ThreadContext::new(main_trace_id, main_span_id, &[]).attach(); + ThreadContext::new(main_trace_id, main_span_id, main_root_span_id, &[]).attach(); let ptr = read_tls_context_ptr(); assert!(!ptr.is_null(), "main thread TLS must be set"); let record = unsafe { &*ptr }; assert_eq!(record.trace_id, main_trace_id); assert_eq!(record.span_id, main_span_id); + assert_eq!(&record.attrs_data[2..18], b"33445566778899aa"); barrier.wait();