Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 323 additions & 10 deletions arrow-select/src/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
//!
//! [`filter`]: crate::filter::filter
//! [`take`]: crate::take::take
use crate::filter::filter_record_batch;
use crate::filter::{
FilterBuilder, FilterPredicate, IndexIterator, IterationStrategy, SlicesIterator,
filter_record_batch,
};
use crate::take::take_record_batch;
use arrow_array::cast::AsArray;
use arrow_array::types::{BinaryViewType, StringViewType};
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, downcast_primitive};
use arrow_schema::{ArrowError, DataType, SchemaRef};
Expand Down Expand Up @@ -145,6 +149,19 @@ pub struct BatchCoalescer {
completed: VecDeque<RecordBatch>,
/// Biggest coalesce batch size. See [`Self::with_biggest_coalesce_batch_size`]
biggest_coalesce_batch_size: Option<usize>,
/// Cached schema-level fused filter support, if supported.
fused_filter: Option<FusedFilter>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct FusedFilter {
view_columns: Vec<FusedViewColumn>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FusedViewColumn {
Utf8(usize),
Binary(usize),
}

impl BatchCoalescer {
Expand All @@ -156,6 +173,7 @@ impl BatchCoalescer {
/// Typical values are `4096` or `8192` rows.
///
pub fn new(schema: SchemaRef, target_batch_size: usize) -> Self {
let fused_filter = classify_fused_filter(&schema);
let in_progress_arrays = schema
.fields()
.iter()
Expand All @@ -170,6 +188,7 @@ impl BatchCoalescer {
completed: VecDeque::with_capacity(1),
buffered_rows: 0,
biggest_coalesce_batch_size: None,
fused_filter,
}
}

Expand Down Expand Up @@ -238,6 +257,12 @@ impl BatchCoalescer {
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if let Some(fused_filter) = &self.fused_filter {
if fused_filter.supports_batch(&batch) {
return self.push_batch_with_filter_fused_inline_view(batch, filter);
}
}

// TODO: optimize this to avoid materializing (copying the results
// of filter to a new batch)
let filtered_batch = filter_record_batch(&batch, filter)?;
Expand Down Expand Up @@ -566,6 +591,139 @@ impl BatchCoalescer {
}
}

#[inline]
fn classify_fused_filter(schema: &SchemaRef) -> Option<FusedFilter> {
let mut view_columns = Vec::new();

for (index, field) in schema.fields().iter().enumerate() {
if field.data_type().is_primitive() {
continue;
}

match field.data_type() {
DataType::Utf8View => view_columns.push(FusedViewColumn::Utf8(index)),
DataType::BinaryView => view_columns.push(FusedViewColumn::Binary(index)),
_ => return None,
}
}

(!view_columns.is_empty()).then_some(FusedFilter { view_columns })
}

impl FusedFilter {
#[inline]
fn supports_batch(&self, batch: &RecordBatch) -> bool {
// Only candidate schemas pay the per-batch check that all supported view values
// are inline and therefore eligible for the fused direct-copy path.
for view_column in &self.view_columns {
let is_inline = match *view_column {
FusedViewColumn::Utf8(index) => batch
.columns()
.get(index)
.and_then(|array| array.as_string_view_opt())
.is_some_and(|view| view.data_buffers().is_empty()),
FusedViewColumn::Binary(index) => batch
.columns()
.get(index)
.and_then(|array| array.as_binary_view_opt())
.is_some_and(|view| view.data_buffers().is_empty()),
};

if !is_inline {
return false;
}
}

true
}
}

impl BatchCoalescer {
fn push_batch_with_filter_fused_inline_view(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError> {
if filter.len() > batch.num_rows() {
return Err(ArrowError::InvalidArgumentError(format!(
"Filter predicate of length {} is larger than target array of length {}",
filter.len(),
batch.num_rows()
)));
}

let mut filter_builder = FilterBuilder::new(filter);
if batch.num_columns() > 1 {
filter_builder = filter_builder.optimize();
}
let predicate = filter_builder.build();
let selected_count = predicate.count();

if selected_count == 0 {
return Ok(());
}

if selected_count == batch.num_rows() && filter.len() == batch.num_rows() {
return self.push_batch(batch);
}

if let Some(limit) = self.biggest_coalesce_batch_size {
if selected_count > limit {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}
}

// For dense inline filters, the existing filter kernel remains faster.
if selected_count.saturating_mul(4) > filter.len() {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let space_in_batch = self.target_batch_size - self.buffered_rows;
if selected_count > space_in_batch {
let filtered_batch = predicate.filter_record_batch(&batch)?;
return self.push_batch(filtered_batch);
}

let (_schema, arrays, _num_rows) = batch.into_parts();

if arrays.len() != self.in_progress_arrays.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Batch has {} columns but BatchCoalescer expects {}",
arrays.len(),
self.in_progress_arrays.len()
)));
}

self.in_progress_arrays
.iter_mut()
.zip(arrays)
.for_each(|(in_progress, array)| {
in_progress.set_source(Some(array));
});

let result = (|| {
for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.copy_rows_by_filter(&predicate)?;
}

self.buffered_rows += selected_count;
if self.buffered_rows >= self.target_batch_size {
self.finish_buffered_batch()?;
}

Ok(())
})();

for in_progress in self.in_progress_arrays.iter_mut() {
in_progress.set_source(None);
}

result
}
}

/// Return a new `InProgressArray` for the given data type
fn create_in_progress_array(data_type: &DataType, batch_size: usize) -> Box<dyn InProgressArray> {
macro_rules! instantiate_primitive {
Expand Down Expand Up @@ -611,6 +769,39 @@ trait InProgressArray: std::fmt::Debug + Send + Sync {
/// Return an error if the source array is not set
fn copy_rows(&mut self, offset: usize, len: usize) -> Result<(), ArrowError>;

/// Copy rows selected by `filter` from the current source array.
fn copy_rows_by_filter(&mut self, filter: &FilterPredicate) -> Result<(), ArrowError> {
match filter.strategy() {
IterationStrategy::None => Ok(()),
IterationStrategy::All => self.copy_rows(0, filter.count()),
IterationStrategy::Slices(slices) => {
for &(start, end) in slices {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::SlicesIterator => {
for (start, end) in SlicesIterator::new(filter.filter_array()) {
self.copy_rows(start, end - start)?;
}
Ok(())
}
IterationStrategy::Indices(indices) => self.copy_rows_by_indices(indices),
IterationStrategy::IndexIterator => {
let indices = IndexIterator::new(filter.filter_array(), filter.count()).collect();
self.copy_rows_by_indices(&indices)
}
}
}

/// Copy rows at the specified indices from the current source array.
fn copy_rows_by_indices(&mut self, indices: &[usize]) -> Result<(), ArrowError> {
for &idx in indices {
self.copy_rows(idx, 1)?;
}
Ok(())
}

/// Finish the currently in-progress array and return it as an `ArrayRef`
fn finish(&mut self) -> Result<ArrayRef, ArrowError>;
}
Expand Down Expand Up @@ -1197,6 +1388,126 @@ mod tests {
.run();
}

#[test]
fn test_binary_view_filtered() {
let values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"A longer string that is more than 12 bytes"),
];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 2 == 0)));

Test::new("coalesce_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(256)
.with_expected_output_sizes(vec![256, 256, 256, 232])
.run();
}

#[test]
fn test_binary_view_filtered_inline() {
let values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];

let binary_view =
BinaryViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(binary_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_binary_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[test]
fn test_string_view_filtered_inline() {
let values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];

let string_view =
StringViewArray::from_iter(std::iter::repeat(values.iter()).flatten().take(1000));
let batch =
RecordBatch::try_from_iter(vec![("c0", Arc::new(string_view) as ArrayRef)]).unwrap();
let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_string_view_filtered_inline")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[test]
fn test_mixed_inline_binary_view_filtered() {
let int_values = Int32Array::from_iter((0..1000).map(Some));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let binary_values: Vec<Option<&[u8]>> = vec![Some(b"foo"), None, Some(b"barbaz")];
let binary_view = BinaryViewArray::from_iter(
std::iter::repeat(binary_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("b", Arc::new(binary_view) as ArrayRef),
])
.unwrap();

let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_mixed_inline_binary_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[test]
fn test_mixed_inline_string_view_filtered() {
let int_values = Int32Array::from_iter((0..1000).map(Some));
let float_values = arrow_array::Float64Array::from_iter((0..1000).map(|v| Some(v as f64)));
let string_values: Vec<Option<&str>> = vec![Some("foo"), None, Some("barbaz")];
let string_view = StringViewArray::from_iter(
std::iter::repeat(string_values.iter()).flatten().take(1000),
);

let batch = RecordBatch::try_from_iter(vec![
("i", Arc::new(int_values) as ArrayRef),
("f", Arc::new(float_values) as ArrayRef),
("s", Arc::new(string_view) as ArrayRef),
])
.unwrap();

let filter = BooleanArray::from_iter((0..1000).map(|idx| Some(idx % 3 != 1)));

Test::new("coalesce_mixed_inline_string_view_filtered")
.with_batch(batch.clone())
.with_filter(filter.clone())
.with_batch(batch)
.with_filter(filter)
.with_batch_size(300)
.with_expected_output_sizes(vec![300, 300, 300, 300, 134])
.run();
}

#[derive(Debug, Clone, PartialEq)]
struct ExpectedLayout {
len: usize,
Expand Down Expand Up @@ -1701,18 +2012,20 @@ mod tests {
let (schema, mut columns, row_count) = batch.into_parts();

for column in columns.iter_mut() {
let Some(string_view) = column.as_string_view_opt() else {
if let Some(string_view) = column.as_string_view_opt() {
// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
}
*column = Arc::new(builder.finish());
continue;
};
}

// Re-create the StringViewArray to ensure memory layout is
// consistent
let mut builder = StringViewBuilder::new();
for s in string_view.iter() {
builder.append_option(s);
if let Some(binary_view) = column.as_binary_view_opt() {
*column = Arc::new(BinaryViewArray::from_iter(binary_view.iter()));
}
// Update the column with the new StringViewArray
*column = Arc::new(builder.finish());
}

let options = RecordBatchOptions::new().with_row_count(Some(row_count));
Expand Down
Loading
Loading