diff --git a/arrow-integration-testing/tests/ipc_writer.rs b/arrow-integration-testing/tests/ipc_writer.rs index d780eb2ee0b5..990723338bf8 100644 --- a/arrow-integration-testing/tests/ipc_writer.rs +++ b/arrow-integration-testing/tests/ipc_writer.rs @@ -100,12 +100,12 @@ fn write_2_0_0_compression() { let all_options = [ IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) .unwrap() - .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .try_with_compression(Some(ipc::writer::IpcCompression::Lz4Frame)) .unwrap(), // write IPC version 5 with zstd IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) .unwrap() - .try_with_compression(Some(ipc::CompressionType::ZSTD)) + .try_with_compression(Some(ipc::writer::IpcCompression::zstd_default())) .unwrap(), ]; diff --git a/arrow-ipc/benches/ipc_reader.rs b/arrow-ipc/benches/ipc_reader.rs index ef1de88d328d..73e2137c2d16 100644 --- a/arrow-ipc/benches/ipc_reader.rs +++ b/arrow-ipc/benches/ipc_reader.rs @@ -20,8 +20,8 @@ use arrow_array::{RecordBatch, builder::StringBuilder}; use arrow_buffer::Buffer; use arrow_ipc::convert::fb_to_schema; use arrow_ipc::reader::{FileDecoder, FileReader, StreamReader, read_footer_length}; -use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; -use arrow_ipc::{Block, CompressionType, root_as_footer}; +use arrow_ipc::writer::{FileWriter, IpcCompression, IpcWriteOptions, StreamWriter}; +use arrow_ipc::{Block, root_as_footer}; use arrow_schema::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use std::io::{Cursor, Write}; @@ -62,7 +62,7 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("StreamReader/read_10/zstd", |b| { let buffer = ipc_stream( IpcWriteOptions::default() - .try_with_compression(Some(CompressionType::ZSTD)) + .try_with_compression(Some(IpcCompression::zstd_default())) .unwrap(), ); b.iter(move || { @@ -78,7 +78,7 @@ fn criterion_benchmark(c: &mut Criterion) { group.bench_function("StreamReader/no_validation/read_10/zstd", |b| { let buffer = ipc_stream( IpcWriteOptions::default() - .try_with_compression(Some(CompressionType::ZSTD)) + .try_with_compression(Some(IpcCompression::zstd_default())) .unwrap(), ); b.iter(move || { diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs index eda7e3c58fe0..b3ab87b5d624 100644 --- a/arrow-ipc/benches/ipc_writer.rs +++ b/arrow-ipc/benches/ipc_writer.rs @@ -17,8 +17,7 @@ use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow_array::{RecordBatch, builder::StringBuilder}; -use arrow_ipc::CompressionType; -use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter}; +use arrow_ipc::writer::{FileWriter, IpcCompression, IpcWriteOptions, StreamWriter}; use arrow_schema::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use std::sync::Arc; @@ -45,7 +44,7 @@ fn criterion_benchmark(c: &mut Criterion) { b.iter(move || { buffer.clear(); let options = IpcWriteOptions::default() - .try_with_compression(Some(CompressionType::ZSTD)) + .try_with_compression(Some(IpcCompression::zstd_default())) .unwrap(); let mut writer = StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options) diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs index 1b7c84d9f05e..d867db9878cc 100644 --- a/arrow-ipc/src/compression.rs +++ b/arrow-ipc/src/compression.rs @@ -22,6 +22,44 @@ use arrow_schema::ArrowError; const LENGTH_NO_COMPRESSED_DATA: i64 = -1; const LENGTH_OF_PREFIX_DATA: i64 = 8; +/// Represents a valid zstd compression level. +#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +pub struct ZstdLevel(i32); + +impl ZstdLevel { + // zstd binds to C, and hence zstd::compression_level_range() is not const as this calls the + // underlying C library. + const MINIMUM_LEVEL: i32 = 1; + const MAXIMUM_LEVEL: i32 = 22; + + /// Attempts to create a zstd compression level from a given compression level. + /// + /// Compression levels must be valid (i.e. be acceptable for [`zstd::compression_level_range`]). + pub fn try_new(level: i32) -> Result { + let range = Self::MINIMUM_LEVEL..=Self::MAXIMUM_LEVEL; + if range.contains(&level) { + Ok(Self(level)) + } else { + Err(ArrowError::InvalidArgumentError(format!( + "valid compression range {}..={} exceeded.", + range.start(), + range.end(), + ))) + } + } + + /// Returns the compression level. + pub fn compression_level(&self) -> i32 { + self.0 + } +} + +impl Default for ZstdLevel { + fn default() -> Self { + Self(1) + } +} + /// Additional context that may be needed for compression. /// /// In the case of zstd, this will contain the zstd context, which can be reused between subsequent @@ -46,6 +84,30 @@ impl Default for CompressionContext { } } +impl CompressionContext { + /// Create a [`CompressionContext`] that uses `level` when compressing with + /// [`CompressionCodec::Zstd`]. Other codecs ignore the level. + /// + /// Without the `zstd` feature the level is ignored; attempting to use + /// [`CompressionCodec::Zstd`] still returns the same "feature not + /// enabled" error as [`CompressionContext::default`]. + pub fn with_zstd_level(level: ZstdLevel) -> Self { + #[cfg(feature = "zstd")] + { + Self { + // `ZstdLevel` is pre-validated, so `new` cannot fail here. + compressor: zstd::bulk::Compressor::new(level.compression_level()) + .expect("zstd level was validated by ZstdLevel::try_new"), + } + } + #[cfg(not(feature = "zstd"))] + { + let _ = level; + Self::default() + } + } +} + impl std::fmt::Debug for CompressionContext { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let mut ds = f.debug_struct("CompressionContext"); @@ -115,6 +177,33 @@ impl TryFrom for CompressionCodec { } } +/// Codec (and any codec-specific parameters) for record-batch bodies in an +/// Arrow IPC stream or file. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum IpcCompression { + /// lz4 frame compression. Requires the `lz4` cargo feature. + Lz4Frame, + /// zstd compression at the given [`ZstdLevel`]. Requires the `zstd` cargo feature. + Zstd(ZstdLevel), +} + +impl IpcCompression { + /// zstd with [`ZstdLevel::default`]. Equivalent to + /// `Zstd(ZstdLevel::default())` and preserves the behaviour previously + /// reached via `try_with_compression(Some(CompressionType::ZSTD))`. + pub fn zstd_default() -> Self { + Self::Zstd(ZstdLevel::default()) + } + + /// The on-wire [`crate::CompressionType`] for this codec selection. + pub fn codec(self) -> CompressionType { + match self { + Self::Lz4Frame => CompressionType::LZ4_FRAME, + Self::Zstd(_) => CompressionType::ZSTD, + } + } +} + impl CompressionCodec { /// Compresses the data in `input` to `output` and appends the /// data using the specified compression mechanism. @@ -357,4 +446,5 @@ mod tests { .unwrap(); assert_eq!(input_bytes, result.as_slice()); } + } diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index a05072a2c47c..aa154e97c4ed 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -44,6 +44,7 @@ use arrow_schema::*; use crate::CONTINUATION_MARKER; use crate::compression::CompressionCodec; pub use crate::compression::CompressionContext; +pub use crate::compression::{IpcCompression, ZstdLevel}; use crate::convert::IpcSchemaEncoder; /// IPC write options used to control the behaviour of the [`IpcDataGenerator`] @@ -65,7 +66,7 @@ pub struct IpcWriteOptions { metadata_version: crate::MetadataVersion, /// Compression, if desired. Will result in a runtime error /// if the corresponding feature is not enabled - batch_compression_type: Option, + batch_compression: Option, /// How to handle updating dictionaries in IPC messages dictionary_handling: DictionaryHandling, } @@ -77,19 +78,18 @@ impl IpcWriteOptions { /// is not enabled pub fn try_with_compression( mut self, - batch_compression_type: Option, + batch_compression: Option, ) -> Result { - self.batch_compression_type = batch_compression_type; + self.batch_compression = batch_compression; - if self.batch_compression_type.is_some() - && self.metadata_version < crate::MetadataVersion::V5 - { + if self.batch_compression.is_some() && self.metadata_version < crate::MetadataVersion::V5 { return Err(ArrowError::InvalidArgumentError( "Compression only supported in metadata v5 and above".to_string(), )); } Ok(self) } + /// Try to create IpcWriteOptions, checking for incompatible settings pub fn try_new( alignment: usize, @@ -115,7 +115,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, - batch_compression_type: None, + batch_compression: None, dictionary_handling: DictionaryHandling::default(), }), crate::MetadataVersion::V5 => { @@ -128,7 +128,7 @@ impl IpcWriteOptions { alignment, write_legacy_ipc_format, metadata_version, - batch_compression_type: None, + batch_compression: None, dictionary_handling: DictionaryHandling::default(), }) } @@ -144,6 +144,21 @@ impl IpcWriteOptions { self.dictionary_handling = dictionary_handling; self } + + /// The on-wire [`crate::CompressionType`] selected for batch bodies, if any. + fn batch_compression_type(&self) -> Option { + self.batch_compression.map(IpcCompression::codec) + } + + /// Build a [`CompressionContext`] configured according to `self`. Only + /// zstd currently has tunable parameters; other codecs fall through to + /// [`CompressionContext::default`]. + fn compression_context(&self) -> CompressionContext { + match self.batch_compression { + Some(IpcCompression::Zstd(level)) => CompressionContext::with_zstd_level(level), + Some(IpcCompression::Lz4Frame) | None => CompressionContext::default(), + } + } } impl Default for IpcWriteOptions { @@ -152,7 +167,7 @@ impl Default for IpcWriteOptions { alignment: 64, write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, - batch_compression_type: None, + batch_compression: None, dictionary_handling: DictionaryHandling::default(), } } @@ -531,7 +546,7 @@ impl IpcDataGenerator { let mut offset = 0; // get the type of compression - let batch_compression_type = write_options.batch_compression_type; + let batch_compression_type = write_options.batch_compression_type(); let compression = batch_compression_type.map(|batch_compression_type| { let mut c = crate::BodyCompressionBuilder::new(&mut fbb); @@ -624,7 +639,7 @@ impl IpcDataGenerator { let mut arrow_data: Vec = vec![]; // get the type of compression - let batch_compression_type = write_options.batch_compression_type; + let batch_compression_type = write_options.batch_compression_type(); let compression = batch_compression_type.map(|batch_compression_type| { let mut c = crate::BodyCompressionBuilder::new(&mut fbb); @@ -1137,6 +1152,7 @@ impl FileWriter { &write_options, ); let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?; + let compression_context = write_options.compression_context(); Ok(Self { writer, write_options, @@ -1148,7 +1164,7 @@ impl FileWriter { dictionary_tracker, custom_metadata: HashMap::new(), data_gen, - compression_context: CompressionContext::default(), + compression_context, }) } @@ -1422,13 +1438,14 @@ impl StreamWriter { &write_options, ); write_message(&mut writer, encoded_message, &write_options)?; + let compression_context = write_options.compression_context(); Ok(Self { writer, write_options, finished: false, dictionary_tracker, data_gen, - compression_context: CompressionContext::default(), + compression_context, }) } @@ -2226,7 +2243,7 @@ mod tests { { let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5) .unwrap() - .try_with_compression(Some(crate::CompressionType::LZ4_FRAME)) + .try_with_compression(Some(IpcCompression::Lz4Frame)) .unwrap(); let mut writer = @@ -2266,7 +2283,7 @@ mod tests { { let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5) .unwrap() - .try_with_compression(Some(crate::CompressionType::LZ4_FRAME)) + .try_with_compression(Some(IpcCompression::Lz4Frame)) .unwrap(); let mut writer = @@ -2305,7 +2322,7 @@ mod tests { { let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5) .unwrap() - .try_with_compression(Some(crate::CompressionType::ZSTD)) + .try_with_compression(Some(IpcCompression::zstd_default())) .unwrap(); let mut writer = @@ -2332,6 +2349,54 @@ mod tests { } } + // Round-trips a batch written with a non-default zstd level through a + // stock reader, confirming the level is a writer-side tuning knob only. + #[test] + #[cfg(feature = "zstd")] + fn test_write_file_with_zstd_non_default_level() { + use crate::reader::FileReader; + + let schema = Schema::new(vec![Field::new("field1", DataType::Int32, true)]); + // Use a batch big enough for zstd to actually compress so we see + // different byte counts at different levels. + let values: Vec> = (0..4096).map(|i| Some(i % 17)).collect(); + let array = Int32Array::from(values); + let record_batch = + RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(array)]).unwrap(); + + let write_at = |level: ZstdLevel| -> Vec { + let mut buf: Vec = Vec::new(); + let write_option = IpcWriteOptions::try_new(8, false, crate::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(IpcCompression::Zstd(level))) + .unwrap(); + let mut writer = + FileWriter::try_new_with_options(&mut buf, &schema, write_option).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + buf + }; + + let lo_level = ZstdLevel::try_new(1).unwrap(); + let hi_level = ZstdLevel::try_new(19).unwrap(); + let lo_bytes = write_at(lo_level); + let hi_bytes = write_at(hi_level); + + // Both outputs must roundtrip to the original record batch via a + // stock reader (which has no knowledge of the writer-side level). + for bytes in [&lo_bytes, &hi_bytes] { + let reader = FileReader::try_new(std::io::Cursor::new(bytes.clone()), None).unwrap(); + for read_batch in reader { + let read_batch = read_batch.unwrap(); + assert_eq!(read_batch.num_rows(), record_batch.num_rows()); + assert_eq!(read_batch.schema(), record_batch.schema()); + for (a, b) in read_batch.columns().iter().zip(record_batch.columns()) { + assert_eq!(a.as_ref(), b.as_ref()); + } + } + } + } + #[test] fn test_write_file() { let schema = Schema::new(vec![Field::new("field1", DataType::UInt32, true)]);