diff --git a/arrow-arith/src/arity.rs b/arrow-arith/src/arity.rs index b9f7a82963c7..66f64954a2d4 100644 --- a/arrow-arith/src/arity.rs +++ b/arrow-arith/src/arity.rs @@ -121,17 +121,7 @@ where if a.is_empty() { return Ok(PrimitiveArray::from(ArrayData::new_empty(&O::DATA_TYPE))); } - - let nulls = NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref()); - - let values = a - .values() - .into_iter() - .zip(b.values()) - .map(|(l, r)| op(*l, *r)); - - let buffer: Vec<_> = values.collect(); - Ok(PrimitiveArray::new(buffer.into(), nulls)) + Ok(a.binary(b, op)) } /// Applies a binary and infallible function to values in two arrays, replacing @@ -268,11 +258,10 @@ where if a.is_empty() { return Ok(PrimitiveArray::from(ArrayData::new_empty(&O::DATA_TYPE))); } - let len = a.len(); - if a.null_count() == 0 && b.null_count() == 0 { - try_binary_no_nulls(len, a, b, op) + try_binary_no_nulls(a.len(), a, b, op) } else { + let len = a.len(); let nulls = NullBuffer::union(a.logical_nulls().as_ref(), b.logical_nulls().as_ref()).unwrap(); diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index b51f5f518668..d0e0e3c80462 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -672,6 +672,24 @@ impl PrimitiveArray { }) } + /// Create a new [`PrimitiveArray`] without validating the provided parts. + /// + /// # Safety + /// + /// Callers must ensure `data_type` is compatible with `T` and that `nulls`, + /// if present, has the same length as `values`. + unsafe fn new_unchecked( + data_type: DataType, + values: ScalarBuffer, + nulls: Option, + ) -> Self { + Self { + data_type, + values, + nulls, + } + } + /// Create a new [`Scalar`] from `value` pub fn new_scalar(value: T::Native) -> Scalar { Scalar::new(Self { @@ -871,6 +889,7 @@ impl PrimitiveArray { /// /// See also /// * [`Self::unary_mut`] for in place modification. + /// * [`Self::unary_mut_or_clone`] for in place modification with an allocating fallback. /// * [`Self::try_unary`] for fallible operations. /// * [`arrow::compute::binary`] for binary operations /// @@ -950,12 +969,52 @@ impl PrimitiveArray { where F: Fn(T::Native) -> T::Native, { - let mut builder = self.into_builder()?; - builder - .values_slice_mut() - .iter_mut() - .for_each(|v| *v = op(*v)); - Ok(builder.finish()) + self.try_unary_mut_impl(op).map_err(|(array, _op)| array) + } + + /// Applies a unary and infallible function to the array in place if + /// possible, or clones and applies the function if the array is shared. + /// + /// # Buffer Reuse + /// + /// If the underlying buffers are not shared with other arrays, mutates the + /// underlying buffer in place, without allocating. + /// + /// If the underlying buffer is shared, allocates a new values buffer and + /// applies the operation to the cloned values. + /// + /// # Null Handling + /// + /// See [`Self::unary`] for more information on null handling. + /// + /// # Example + /// + /// ```rust + /// # use arrow_array::Int32Array; + /// let array = Int32Array::from(vec![Some(5), Some(7), None]); + /// let c = array.unary_mut_or_clone(|x| x * 2 + 1); + /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None])); + /// ``` + /// + /// # Example: modify [`ArrayRef`] in place when possible + /// + /// ```rust + /// # use std::sync::Arc; + /// # use arrow_array::{Array, ArrayRef, Int32Array, cast::AsArray, types::Int32Type}; + /// # let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(5), Some(7), None])); + /// let a = array.as_primitive::().clone(); + /// drop(array); + /// let c = a.unary_mut_or_clone(|x| x * 2 + 1); + /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None])); + /// ``` + pub fn unary_mut_or_clone(self, op: F) -> PrimitiveArray + where + F: Fn(T::Native) -> T::Native, + { + match self.try_unary_mut_impl(op) { + Ok(array) => array, + Err((array, op)) => array.unary(op), + } } /// Applies a unary fallible function to all valid values in a primitive @@ -1016,22 +1075,383 @@ impl PrimitiveArray { where F: Fn(T::Native) -> Result, { + self.try_unary_mut_with_fallback(op) + .map_err(|(array, _op)| array) + } + + /// Applies a unary fallible function to all valid values in a mutable + /// primitive array if possible, or clones and applies the function if the + /// array is shared. + /// + /// # Null Handling + /// + /// See [`Self::try_unary`] for more information on null handling. + /// + /// # Buffer Reuse + /// + /// If the underlying buffers are not shared with other arrays, mutates the + /// underlying buffer in place, without allocating. + /// + /// If the underlying buffer is shared, allocates a new values buffer and + /// applies the operation to the cloned values. + /// + /// # Example + /// + /// ```rust + /// # use arrow_array::Int32Array; + /// # fn main() -> Result<(), &'static str> { + /// let array = Int32Array::from(vec![Some(5), Some(7), None]); + /// let c = array.try_unary_mut_or_clone(|x| Ok(x * 2 + 1))?; + /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None])); + /// # Ok(()) + /// # } + /// ``` + pub fn try_unary_mut_or_clone(self, op: F) -> Result, E> + where + F: Fn(T::Native) -> Result, + { + match self.try_unary_mut_with_fallback(op) { + Ok(result) => result, + Err((array, op)) => array.try_unary(op), + } + } + + /// Applies a binary infallible function to this array and `rhs`, + /// producing a new array. + /// + /// # Null Handling + /// + /// The result null buffer is the union of `self` and `rhs`: if either side + /// is null, the corresponding output slot is null. + /// + /// As with [`Self::unary`], the function is applied to all values, + /// including those on null slots. It therefore must be infallible for any + /// value of the corresponding type or this function may panic. + /// + /// # Panics + /// + /// Panics if `self` and `rhs` have different lengths. + pub fn binary(&self, rhs: &PrimitiveArray, op: F) -> PrimitiveArray + where + U: ArrowPrimitiveType, + O: ArrowPrimitiveType, + F: Fn(T::Native, U::Native) -> O::Native, + { + assert_eq!(self.len(), rhs.len()); + let nulls = NullBuffer::union(self.nulls(), rhs.nulls()); + let values = self + .values() + .iter() + .zip(rhs.values().iter()) + .map(|(a, b)| op(*a, *b)) + .collect(); + PrimitiveArray::new(values, nulls) + } + + /// Applies a binary fallible function to this array and `rhs`, + /// producing a new array. + /// + /// # Null Handling + /// + /// The result null buffer is the union of `self` and `rhs`: if either side + /// is null, the corresponding output slot is null. + /// + /// Unlike [`Self::binary`], the function is applied only to valid rows. + /// + /// # Panics + /// + /// Panics if `self` and `rhs` have different lengths. + pub fn try_binary( + &self, + rhs: &PrimitiveArray, + op: F, + ) -> Result, E> + where + U: ArrowPrimitiveType, + O: ArrowPrimitiveType, + F: Fn(T::Native, U::Native) -> Result, + { + assert_eq!(self.len(), rhs.len()); let len = self.len(); - let null_count = self.null_count(); - let mut builder = self.into_builder()?; + let nulls = NullBuffer::union(self.nulls(), rhs.nulls()); + let null_count = nulls.as_ref().map(|n| n.null_count()).unwrap_or(0); + let null_buffer = nulls.as_ref().map(|n| n.validity()); - let (slice, null_buffer) = builder.slices_mut(); + let mut buffer = BufferBuilder::::new(len); + buffer.append_n_zeroed(len); + let slice = buffer.as_slice_mut(); - let r = try_for_each_valid_idx(len, 0, null_count, null_buffer.as_deref(), |idx| { - unsafe { *slice.get_unchecked_mut(idx) = op(*slice.get_unchecked(idx))? }; + let f = |idx| { + unsafe { + *slice.get_unchecked_mut(idx) = + op(self.value_unchecked(idx), rhs.value_unchecked(idx))?; + } Ok::<_, E>(()) - }); + }; + + try_for_each_valid_idx(len, 0, null_count, null_buffer, f)?; + + Ok(PrimitiveArray::new(buffer.finish().into(), nulls)) + } + + /// Applies a binary infallible function to this array and `rhs` in place if + /// possible, or clones and applies the function if the array is shared. + /// + /// # Buffer Reuse + /// + /// If this array's underlying buffers are not shared with other arrays, + /// mutates the underlying values buffer in place, without allocating. + /// + /// If the underlying buffer is shared, allocates a new values buffer and + /// applies the operation to the cloned values. + /// + /// # Null Handling + /// + /// The result null buffer is the union of `self` and `rhs`: if either side + /// is null, the corresponding output slot is null. + /// + /// As with [`Self::unary`], the function is applied to all values, + /// including those on null slots. It therefore must be infallible for any + /// value of the corresponding type or this function may panic. + /// + /// # Panics + /// + /// Panics if `self` and `rhs` have different lengths. + /// + /// # Example + /// + /// ```rust + /// # use arrow_array::Int32Array; + /// let a = Int32Array::from(vec![Some(1), Some(2), None]); + /// let b = Int32Array::from(vec![Some(10), Some(20), Some(30)]); + /// let c = a.binary_mut_or_clone(&b, |a, b| a + b); + /// assert_eq!(c, Int32Array::from(vec![Some(11), Some(22), None])); + /// ``` + pub fn binary_mut_or_clone(self, rhs: &PrimitiveArray, op: F) -> PrimitiveArray + where + F: Fn(T::Native, T::Native) -> T::Native, + { + match self.try_binary_mut_impl(rhs, op) { + Ok(array) => array, + Err((array, op)) => array.binary(rhs, op), + } + } + + /// Applies a binary fallible function to this array and `rhs` in place if + /// possible, or clones and applies the function if the array is shared. + /// + /// # Null Handling + /// + /// The result null buffer is the union of `self` and `rhs`: if either side + /// is null, the corresponding output slot is null. + /// + /// Unlike [`Self::binary_mut_or_clone`], the function is applied only to + /// valid rows. + /// + /// # Panics + /// + /// Panics if `self` and `rhs` have different lengths. + /// + /// # Example + /// + /// ```rust + /// # use arrow_array::Int32Array; + /// # fn main() -> Result<(), &'static str> { + /// let a = Int32Array::from(vec![Some(10), Some(21), None]); + /// let b = Int32Array::from(vec![Some(2), Some(3), Some(0)]); + /// let c = a.try_binary_mut_or_clone(&b, |a, b| { + /// if b == 0 { + /// Err("division by zero") + /// } else { + /// Ok(a / b) + /// } + /// })?; + /// assert_eq!(c, Int32Array::from(vec![Some(5), Some(7), None])); + /// # Ok(()) + /// # } + /// ``` + /// + /// # Example: modify [`ArrayRef`] in place when possible + /// + /// ```rust + /// # use std::sync::Arc; + /// # use arrow_array::{Array, ArrayRef, Int32Array, cast::AsArray, types::Int32Type}; + /// # fn main() -> Result<(), &'static str> { + /// # let array_a: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), Some(21), None])); + /// # let array_b: ArrayRef = Arc::new(Int32Array::from(vec![Some(2), Some(3), Some(0)])); + /// let a = array_a.as_primitive::().clone(); + /// let b = array_b.as_primitive::(); + /// drop(array_a); + /// let c = a.try_binary_mut_or_clone(b, |lhs, rhs| Ok(lhs / rhs))?; + /// assert_eq!(c, Int32Array::from(vec![Some(5), Some(7), None])); + /// # Ok(()) + /// # } + /// ``` + pub fn try_binary_mut_or_clone( + self, + rhs: &PrimitiveArray, + op: F, + ) -> Result, E> + where + F: Fn(T::Native, T::Native) -> Result, + { + match self.try_binary_mut_with_fallback(rhs, op) { + Ok(result) => result, + Err((array, op)) => array.try_binary(rhs, op), + } + } + + // Tries to apply an infallible unary operation in place and returns the + // original closure on failure so callers can fall back to an allocating + // path without requiring `F: Clone`. This helper underpins + // `Self::unary_mut` and `Self::unary_mut_or_clone`. + fn try_unary_mut_impl(self, op: F) -> Result, (PrimitiveArray, F)> + where + F: Fn(T::Native) -> T::Native, + { + let (data_type, values, nulls) = self.into_parts(); + let mut values = match values.into_inner().into_vec() { + Ok(values) => values, + Err(buffer) => { + // SAFETY: `buffer` came from the values `ScalarBuffer` of a + // previously valid `PrimitiveArray`, so it is already aligned + // for `T::Native` and has the correct logical length. + let values = unsafe { ScalarBuffer::new_unchecked(buffer) }; + // SAFETY: `data_type`, `values`, and `nulls` all came from a + // previously valid `PrimitiveArray` via `into_parts`, and are + // reconstructed unchanged on this path. + let array = unsafe { Self::new_unchecked(data_type, values, nulls) }; + return Err((array, op)); + } + }; + values.iter_mut().for_each(|v| *v = op(*v)); + Ok(PrimitiveArray::new(values.into(), nulls).with_data_type(data_type)) + } + + // Tries to apply a fallible unary operation in place while preserving the + // closure on reuse failure, allowing callers to distinguish allocation + // fallback from user error. This helper underpins + // `Self::try_unary_mut` and `Self::try_unary_mut_or_clone`. + fn try_unary_mut_with_fallback( + self, + op: F, + ) -> Result, E>, (PrimitiveArray, F)> + where + F: Fn(T::Native) -> Result, + { + let (data_type, values, nulls) = self.into_parts(); + let len = values.len(); + let null_count = nulls.as_ref().map(|n| n.null_count()).unwrap_or(0); + let mut values = match values.into_inner().into_vec() { + Ok(values) => values, + Err(buffer) => { + // SAFETY: `buffer` came from the values `ScalarBuffer` of a + // previously valid `PrimitiveArray`, so it is already aligned + // for `T::Native` and has the correct logical length. + let values = unsafe { ScalarBuffer::new_unchecked(buffer) }; + // SAFETY: `data_type`, `values`, and `nulls` all came from a + // previously valid `PrimitiveArray` via `into_parts`, and are + // reconstructed unchanged on this path. + let array = unsafe { Self::new_unchecked(data_type, values, nulls) }; + return Err((array, op)); + } + }; + + let r = try_for_each_valid_idx( + len, + 0, + null_count, + nulls.as_ref().map(|n| n.validity()), + |idx| { + unsafe { *values.get_unchecked_mut(idx) = op(*values.get_unchecked(idx))? }; + Ok::<_, E>(()) + }, + ); if let Err(err) = r { return Ok(Err(err)); } - Ok(Ok(builder.finish())) + Ok(Ok( + PrimitiveArray::new(values.into(), nulls).with_data_type(data_type) + )) + } + + fn try_binary_mut_impl( + self, + rhs: &PrimitiveArray, + op: F, + ) -> Result, (PrimitiveArray, F)> + where + F: Fn(T::Native, T::Native) -> T::Native, + { + assert_eq!(self.len(), rhs.len()); + let (data_type, values, nulls) = self.into_parts(); + let len = values.len(); + let nulls = NullBuffer::union(nulls.as_ref(), rhs.nulls()); + let mut values = match values.into_inner().into_vec() { + Ok(values) => values, + Err(buffer) => { + let values = ScalarBuffer::new(buffer, 0, len); + return Err(( + PrimitiveArray::new(values, nulls).with_data_type(data_type), + op, + )); + } + }; + values + .iter_mut() + .zip(rhs.values().iter()) + .for_each(|(a, b)| *a = op(*a, *b)); + + Ok(PrimitiveArray::new(values.into(), nulls).with_data_type(data_type)) + } + + fn try_binary_mut_with_fallback( + self, + rhs: &PrimitiveArray, + op: F, + ) -> Result, E>, (PrimitiveArray, F)> + where + F: Fn(T::Native, T::Native) -> Result, + { + assert_eq!(self.len(), rhs.len()); + let (data_type, values, nulls) = self.into_parts(); + let len = values.len(); + let nulls = NullBuffer::union(nulls.as_ref(), rhs.nulls()); + let null_count = nulls.as_ref().map(|n| n.null_count()).unwrap_or(0); + let mut values = match values.into_inner().into_vec() { + Ok(values) => values, + Err(buffer) => { + let values = ScalarBuffer::new(buffer, 0, len); + return Err(( + PrimitiveArray::new(values, nulls).with_data_type(data_type), + op, + )); + } + }; + + let r = try_for_each_valid_idx( + len, + 0, + null_count, + nulls.as_ref().map(|n| n.validity()), + |idx| { + unsafe { + *values.get_unchecked_mut(idx) = + op(*values.get_unchecked(idx), rhs.value_unchecked(idx))?; + } + Ok::<_, E>(()) + }, + ); + + if let Err(err) = r { + return Ok(Err(err)); + } + + Ok(Ok( + PrimitiveArray::new(values.into(), nulls).with_data_type(data_type) + )) } /// Applies a unary and nullable function to all valid values in a primitive array @@ -1756,6 +2176,36 @@ mod tests { use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano}; use arrow_schema::TimeUnit; + // Captures the values-buffer identity for a PrimitiveArray so tests can + // assert whether an operation reused the original allocation or produced a + // new one. + struct PointerInfo { + ptr: *const u8, + len: usize, + } + + impl PointerInfo { + // Record the current values buffer pointer and view. + fn new(array: &PrimitiveArray) -> Self { + Self { + ptr: array.values().inner().as_ptr(), + len: array.values().len(), + } + } + + // Assert that the array still points at the exact same values buffer. + fn assert_same(&self, array: &PrimitiveArray) { + assert_eq!(array.values().inner().as_ptr(), self.ptr); + assert_eq!(array.values().len(), self.len); + } + + // Assert that the array now points at a different values allocation, + // indicating the operation fell back to an allocating path. + fn assert_different(&self, array: &PrimitiveArray) { + assert_ne!(array.values().inner().as_ptr(), self.ptr); + } + } + #[test] fn test_primitive_array_from_vec() { let buf = Buffer::from_slice_ref([0, 1, 2, 3, 4]); @@ -2824,6 +3274,107 @@ mod tests { assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None])); } + #[test] + fn test_unary_mut_or_clone_unshared() { + let array = Int32Array::from(vec![Some(5), Some(7), None]); + let info = PointerInfo::new(&array); + let result = array.unary_mut_or_clone(|x| x * 2 + 1); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(15), None])); + info.assert_same(&result); + } + + #[test] + fn test_unary_mut_or_clone_shared() { + let array = Int32Array::from(vec![Some(5), Some(7), None]); + let info = PointerInfo::new(&array); + let _shared = array.clone(); + let result = array.unary_mut_or_clone(|x| x * 2 + 1); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(15), None])); + info.assert_different(&result); + } + + #[test] + fn test_try_unary_mut_or_clone_unshared() { + let array = Int32Array::from(vec![Some(5), Some(7), None]); + let info = PointerInfo::new(&array); + let result = array.try_unary_mut_or_clone(|x| Ok::<_, &'static str>(x * 2 + 1)); + let result = result.unwrap(); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(15), None])); + info.assert_same(&result); + } + + #[test] + fn test_try_unary_mut_or_clone_shared() { + let array = Int32Array::from(vec![Some(5), Some(7), None]); + let info = PointerInfo::new(&array); + let _shared = array.clone(); + let result = array.try_unary_mut_or_clone(|x| Ok::<_, &'static str>(x * 2 + 1)); + let result = result.unwrap(); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(15), None])); + info.assert_different(&result); + } + + #[test] + fn test_binary_mut_or_clone_unshared() { + let a = Int32Array::from(vec![Some(1), Some(2), None]); + let info = PointerInfo::new(&a); + let b = Int32Array::from(vec![Some(10), Some(20), Some(30)]); + let result = a.binary_mut_or_clone(&b, |a, b| a + b); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(22), None])); + info.assert_same(&result); + } + + #[test] + fn test_binary_mut_or_clone_shared() { + let a = Int32Array::from(vec![Some(1), Some(2), None]); + let info = PointerInfo::new(&a); + let _shared = a.clone(); + let b = Int32Array::from(vec![Some(10), Some(20), Some(30)]); + let result = a.binary_mut_or_clone(&b, |a, b| a + b); + assert_eq!(result, Int32Array::from(vec![Some(11), Some(22), None])); + info.assert_different(&result); + } + + #[test] + fn test_try_binary_mut_or_clone_unshared() { + let a = Int32Array::from(vec![Some(10), Some(21), None]); + let info = PointerInfo::new(&a); + let b = Int32Array::from(vec![Some(2), Some(3), Some(4)]); + let result = a.try_binary_mut_or_clone(&b, |a, b| Ok::<_, &'static str>(a / b)); + let result = result.unwrap(); + assert_eq!(result, Int32Array::from(vec![Some(5), Some(7), None])); + info.assert_same(&result); + } + + #[test] + fn test_try_binary_mut_or_clone_shared() { + let a = Int32Array::from(vec![Some(10), Some(21), None]); + let info = PointerInfo::new(&a); + let _shared = a.clone(); + let b = Int32Array::from(vec![Some(2), Some(3), Some(4)]); + let result = a.try_binary_mut_or_clone(&b, |a, b| Ok::<_, &'static str>(a / b)); + let result = result.unwrap(); + assert_eq!(result, Int32Array::from(vec![Some(5), Some(7), None])); + info.assert_different(&result); + } + + #[test] + fn test_try_binary_mut_or_clone_skips_nulls() { + let a = Int32Array::from(vec![Some(10), None, Some(30)]); + let b = Int32Array::from(vec![Some(2), Some(0), Some(5)]); + let result = a.try_binary_mut_or_clone(&b, |a, b| { + if b == 0 { + Err("division by zero") + } else { + Ok(a / b) + } + }); + assert_eq!( + result.unwrap(), + Int32Array::from(vec![Some(5), None, Some(6)]) + ); + } + #[test] #[should_panic( expected = "PrimitiveArray expected data type Interval(MonthDayNano) got Interval(DayTime)"