Skip to content

Commit 338905e

Browse files
Guillermo Calderon LopezCopilot
andcommitted
perf: avoid unnecessary copy of OTLP bytes in OtlpBytesAdapter::new()
Replace BinaryArray::from_vec() which deep-copies the entire OTLP payload with a zero-copy construction using Buffer::from(bytes::Bytes). The clone_bytes() call is just an Arc refcount bump, and Buffer::from(Bytes) wraps the data without copying, eliminating a full memcpy on the ingest hot path. Fixes #2703 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent dd34485 commit 338905e

1 file changed

Lines changed: 69 additions & 3 deletions

File tree

  • rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor

rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/bundle_adapter.rs

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ use std::collections::HashMap;
4444
use std::sync::{Arc, LazyLock};
4545
use std::time::SystemTime;
4646

47-
use arrow::array::{BinaryArray, RecordBatch};
47+
use arrow::array::{ArrayData, BinaryArray, RecordBatch};
48+
use arrow::buffer::Buffer;
4849
use arrow::datatypes::{DataType, Field, Schema};
4950
use quiver::record_bundle::{
5051
BundleDescriptor, PayloadRef, RecordBundle, SchemaFingerprint, SlotDescriptor, SlotId,
@@ -348,8 +349,38 @@ impl OtlpBytesAdapter {
348349
OtlpProtoBytes::ExportTracesRequest(_) => SignalType::Traces,
349350
};
350351

351-
// Create a record batch with a single binary column containing the OTLP bytes
352-
let binary_array = BinaryArray::from_vec(vec![bytes.as_bytes()]);
352+
// Create a record batch with a single binary column containing the OTLP bytes.
353+
// Use zero-copy wrapping: clone_bytes() is just an Arc refcount bump,
354+
// and Buffer::from(Bytes) wraps without copying the payload data.
355+
let data_bytes = bytes.clone_bytes();
356+
let len = i32::try_from(data_bytes.len()).map_err(|_| {
357+
(
358+
BundleConversionError::RecordBatchCreationError(format!(
359+
"OTLP payload too large for BinaryArray: {} bytes exceeds i32::MAX",
360+
data_bytes.len()
361+
)),
362+
bytes.clone(),
363+
)
364+
})?;
365+
let data_buffer = Buffer::from(data_bytes);
366+
let offsets = Buffer::from_slice_ref([0i32, len]);
367+
368+
let array_data = match ArrayData::builder(DataType::Binary)
369+
.len(1)
370+
.add_buffer(offsets)
371+
.add_buffer(data_buffer)
372+
.build()
373+
{
374+
Ok(data) => data,
375+
Err(e) => {
376+
return Err((
377+
BundleConversionError::RecordBatchCreationError(e.to_string()),
378+
bytes,
379+
));
380+
}
381+
};
382+
383+
let binary_array = BinaryArray::from(array_data);
353384
let batch = match RecordBatch::try_new(otlp_binary_schema(), vec![Arc::new(binary_array)]) {
354385
Ok(batch) => batch,
355386
Err(e) => {
@@ -736,6 +767,41 @@ mod tests {
736767
assert!(adapter.payload(wrong_slot).is_none());
737768
}
738769

770+
#[test]
771+
fn test_otlp_bytes_adapter_zero_copy() {
772+
let test_bytes = b"zero-copy verification payload".to_vec();
773+
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Logs, test_bytes);
774+
775+
let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();
776+
777+
let slot = to_otlp_slot_id(SignalType::Logs);
778+
let payload = adapter.payload(slot).unwrap();
779+
let column = payload.batch.column(0);
780+
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
781+
782+
// The Arrow buffer should alias the original bytes (zero-copy).
783+
// Compare the pointer of the stored value with the original OtlpProtoBytes.
784+
let arrow_value_ptr = binary_array.value(0).as_ptr();
785+
let original_ptr = adapter.bytes.as_bytes().as_ptr();
786+
assert_eq!(
787+
arrow_value_ptr, original_ptr,
788+
"BinaryArray value should point to the same memory as OtlpProtoBytes (zero-copy)"
789+
);
790+
}
791+
792+
#[test]
793+
fn test_otlp_bytes_adapter_empty_payload() {
794+
let otlp = OtlpProtoBytes::new_from_bytes(SignalType::Metrics, vec![]);
795+
796+
let adapter = OtlpBytesAdapter::new(otlp).map_err(|(e, _)| e).unwrap();
797+
798+
let slot = to_otlp_slot_id(SignalType::Metrics);
799+
let payload = adapter.payload(slot).unwrap();
800+
let column = payload.batch.column(0);
801+
let binary_array = column.as_any().downcast_ref::<BinaryArray>().unwrap();
802+
assert_eq!(binary_array.value(0), b"");
803+
}
804+
739805
#[test]
740806
fn test_extract_otlp_bytes() {
741807
let original_bytes = b"original OTLP data".to_vec();

0 commit comments

Comments
 (0)