Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 74 additions & 12 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,16 +868,47 @@ fn get_dictionary_values(
Ok(dictionary_values)
}

/// Read the data for a given block
/// Reads the full data block (metadata + body) from the underlying reader.
///
/// Uses a zero-initialized buffer for small blocks. For larger blocks, reads
/// into a temporary `Vec<u8>` and reuses the allocation when it is 64-byte
/// aligned, matching Arrow's `ALIGNMENT` for `MutableBuffer`. Otherwise, it
/// falls back to copying into an Arrow-aligned buffer.
///
/// This reduces redundant zero-initialization on large reads while preserving
/// the alignment expected by Arrow buffers.
fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, ArrowError> {
reader.seek(SeekFrom::Start(block.offset() as u64))?;
let body_len = block.bodyLength().to_usize().unwrap();
let metadata_len = block.metaDataLength().to_usize().unwrap();
let total_len = body_len.checked_add(metadata_len).unwrap();

let mut buf = MutableBuffer::from_len_zeroed(total_len);
reader.read_exact(&mut buf)?;
Ok(buf.into())
if total_len < 8 * 1024 {
let mut buf = MutableBuffer::from_len_zeroed(total_len);
reader.read_exact(&mut buf)?;
return Ok(buf.into());
}

let mut vec = Vec::with_capacity(total_len);
reader
.by_ref()
.take(total_len as u64)
.read_to_end(&mut vec)?;

if vec.len() != total_len {
return Err(ArrowError::IpcError(format!(
"Expected IPC block of length {total_len}, got {}",
vec.len()
)));
}

if ((vec.as_ptr() as usize) & 63) == 0 {
Ok(Buffer::from_vec(vec))
} else {
let mut buf = MutableBuffer::from_len_zeroed(total_len);
buf.copy_from_slice(&vec);
Ok(buf.into())
}
}

/// Parse an encapsulated message
Expand Down Expand Up @@ -1778,13 +1809,18 @@ impl<R: Read> MessageReader<R> {
/// Reads the entire next message from the underlying reader which includes
/// the metadata length, the metadata, and the body.
///
/// Small message bodies use the zero-initialized buffer path. Larger bodies are
/// read into a temporary `Vec<u8>` and reused directly when the allocation is
/// 64-byte aligned, matching Arrow's `ALIGNMENT` for `MutableBuffer`. Otherwise,
/// the body is copied into an Arrow-aligned buffer.
///
/// This avoids an extra initialization pass on large reads while preserving the
/// alignment expected by Arrow buffers.
///
/// # Returns
/// - `Ok(None)` if the the reader signals the end of stream with EOF on
/// the first read
/// - `Err(_)` if the reader returns an error other than on the first
/// read, or if the metadata length is invalid
/// - `Ok(Some(_))` with the Message and buffer containiner the
/// body bytes otherwise.
/// - `Ok(None)` if the reader signals end-of-stream on the initial read
/// - `Err(_)` if an error occurs or metadata is invalid
/// - `Ok(Some(_))` containing the parsed message and its body buffer
fn maybe_next(&mut self) -> Result<Option<(Message::Message<'_>, MutableBuffer)>, ArrowError> {
let meta_len = self.read_meta_len()?;
let Some(meta_len) = meta_len else {
Expand All @@ -1798,8 +1834,34 @@ impl<R: Read> MessageReader<R> {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;

let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;
let body_len = message.bodyLength() as usize;

if body_len < 8 * 1024 {
let mut buf = MutableBuffer::from_len_zeroed(body_len);
self.reader.read_exact(&mut buf)?;
return Ok(Some((message, buf)));
}

let mut vec = Vec::with_capacity(body_len);
self.reader
.by_ref()
.take(body_len as u64)
.read_to_end(&mut vec)?;

if vec.len() != body_len {
return Err(ArrowError::IpcError(format!(
"Expected IPC message body of length {body_len}, got {}",
vec.len()
)));
}

let buf = if ((vec.as_ptr() as usize) & 63) == 0 {
MutableBuffer::from(vec)
} else {
let mut buf = MutableBuffer::from_len_zeroed(body_len);
buf.copy_from_slice(&vec);
buf
};

Ok(Some((message, buf)))
}
Expand Down
Loading