diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 411c1f14c26..ef3223aa466 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -868,16 +868,47 @@ fn get_dictionary_values( Ok(dictionary_values) } -/// Read the data for a given block +/// Reads the full data block (metadata + body) from the underlying reader. +/// +/// Uses a zero-initialized buffer for small blocks. For larger blocks, reads +/// into a temporary `Vec` and reuses the allocation when it is 64-byte +/// aligned, matching Arrow's `ALIGNMENT` for `MutableBuffer`. Otherwise, it +/// falls back to copying into an Arrow-aligned buffer. +/// +/// This reduces redundant zero-initialization on large reads while preserving +/// the alignment expected by Arrow buffers. fn read_block(mut reader: R, block: &Block) -> Result { reader.seek(SeekFrom::Start(block.offset() as u64))?; let body_len = block.bodyLength().to_usize().unwrap(); let metadata_len = block.metaDataLength().to_usize().unwrap(); let total_len = body_len.checked_add(metadata_len).unwrap(); - let mut buf = MutableBuffer::from_len_zeroed(total_len); - reader.read_exact(&mut buf)?; - Ok(buf.into()) + if total_len < 8 * 1024 { + let mut buf = MutableBuffer::from_len_zeroed(total_len); + reader.read_exact(&mut buf)?; + return Ok(buf.into()); + } + + let mut vec = Vec::with_capacity(total_len); + reader + .by_ref() + .take(total_len as u64) + .read_to_end(&mut vec)?; + + if vec.len() != total_len { + return Err(ArrowError::IpcError(format!( + "Expected IPC block of length {total_len}, got {}", + vec.len() + ))); + } + + if ((vec.as_ptr() as usize) & 63) == 0 { + Ok(Buffer::from_vec(vec)) + } else { + let mut buf = MutableBuffer::from_len_zeroed(total_len); + buf.copy_from_slice(&vec); + Ok(buf.into()) + } } /// Parse an encapsulated message @@ -1778,13 +1809,18 @@ impl MessageReader { /// Reads the entire next message from the underlying reader which includes /// the metadata length, the metadata, and the body. /// + /// Small message bodies use the zero-initialized buffer path. Larger bodies are + /// read into a temporary `Vec` and reused directly when the allocation is + /// 64-byte aligned, matching Arrow's `ALIGNMENT` for `MutableBuffer`. Otherwise, + /// the body is copied into an Arrow-aligned buffer. + /// + /// This avoids an extra initialization pass on large reads while preserving the + /// alignment expected by Arrow buffers. + /// /// # Returns - /// - `Ok(None)` if the the reader signals the end of stream with EOF on - /// the first read - /// - `Err(_)` if the reader returns an error other than on the first - /// read, or if the metadata length is invalid - /// - `Ok(Some(_))` with the Message and buffer containiner the - /// body bytes otherwise. + /// - `Ok(None)` if the reader signals end-of-stream on the initial read + /// - `Err(_)` if an error occurs or metadata is invalid + /// - `Ok(Some(_))` containing the parsed message and its body buffer fn maybe_next(&mut self) -> Result, MutableBuffer)>, ArrowError> { let meta_len = self.read_meta_len()?; let Some(meta_len) = meta_len else { @@ -1798,8 +1834,34 @@ impl MessageReader { ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) })?; - let mut buf = MutableBuffer::from_len_zeroed(message.bodyLength() as usize); - self.reader.read_exact(&mut buf)?; + let body_len = message.bodyLength() as usize; + + if body_len < 8 * 1024 { + let mut buf = MutableBuffer::from_len_zeroed(body_len); + self.reader.read_exact(&mut buf)?; + return Ok(Some((message, buf))); + } + + let mut vec = Vec::with_capacity(body_len); + self.reader + .by_ref() + .take(body_len as u64) + .read_to_end(&mut vec)?; + + if vec.len() != body_len { + return Err(ArrowError::IpcError(format!( + "Expected IPC message body of length {body_len}, got {}", + vec.len() + ))); + } + + let buf = if ((vec.as_ptr() as usize) & 63) == 0 { + MutableBuffer::from(vec) + } else { + let mut buf = MutableBuffer::from_len_zeroed(body_len); + buf.copy_from_slice(&vec); + buf + }; Ok(Some((message, buf))) }