Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 170 additions & 2 deletions rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,56 @@ impl ZoneMapIndex {
Self::zone_has_finite_min(zone) && !(zone.max.is_null() || Self::scalar_is_nan(&zone.max))
}

/// Global `[min, max]` folded from this index's per-zone summaries, no scan.
/// Thin wrapper over [`value_range_over`](Self::value_range_over) for a
/// single segment; see it for the full contract.
pub fn value_range(&self) -> Option<(ScalarValue, ScalarValue)> {
Self::value_range_over([self])
}

/// Global `[min, max]` folded across one or more ZoneMap segments (the
/// disjoint per-column segments of a multi-segment index), without a scan.
///
/// `None` when no zone has a finite bound, or when any zone's `max` is NaN:
/// `ScalarValue`'s total order ranks NaN above every finite value, so a
/// NaN-bearing zone hides its true finite max and no sound upper bound exists
/// without a scan — folding only the finite maxes would yield a *subset* that
/// prunes live rows. Folding raw zones (not each segment's `value_range`)
/// keeps this exact across segments.
///
/// Otherwise the range is a superset of the segments' live values,
/// conservative under deletion vectors: safe to prune with, not guaranteed
/// tight. The caller must ensure the segments jointly cover every live
/// fragment.
pub fn value_range_over<'a>(
segments: impl IntoIterator<Item = &'a Self>,
) -> Option<(ScalarValue, ScalarValue)> {
let mut min: Option<&ScalarValue> = None;
let mut max: Option<&ScalarValue> = None;
for zone in segments.into_iter().flat_map(|seg| seg.zones.iter()) {
if Self::scalar_is_nan(&zone.max) {
return None;
}
if Self::scalar_is_finite_bound(&zone.min)
&& min.is_none_or(|cur| zone.min.partial_cmp(cur).is_some_and(|o| o.is_lt()))
{
min = Some(&zone.min);
}
if Self::scalar_is_finite_bound(&zone.max)
&& max.is_none_or(|cur| zone.max.partial_cmp(cur).is_some_and(|o| o.is_gt()))
{
max = Some(&zone.max);
}
}
Some((min?.clone(), max?.clone()))
}

/// A scalar bound usable for the global range: present (not all-null) and,
/// for floats, not NaN.
fn scalar_is_finite_bound(v: &ScalarValue) -> bool {
!v.is_null() && !Self::scalar_is_nan(v)
}

/// Evaluates whether a zone could potentially contain values matching the query.
///
/// NaN query values use the explicit `nan_count`. For finite query values,
Expand Down Expand Up @@ -1045,8 +1095,8 @@ mod tests {

use crate::scalar::zoned::ZoneBound;
use crate::scalar::zonemap::{ZoneMapIndexPlugin, ZoneMapStatistics};
use arrow::datatypes::Float32Type;
use arrow_array::{Array, RecordBatch, UInt64Array, record_batch};
use arrow::datatypes::{ArrowPrimitiveType, Float32Type, Int64Type};
use arrow_array::{Array, PrimitiveArray, RecordBatch, UInt64Array, record_batch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -1098,6 +1148,124 @@ mod tests {
Box::pin(RecordBatchStreamAdapter::new(schema, stream))
}

/// Build a single-column ZoneMap of primitive type `T` from `fragments`
/// (one batch -> one fragment), with small zones, then load it back.
async fn train_and_load<T: ArrowPrimitiveType>(
fragments: Vec<Vec<Option<T::Native>>>,
) -> Arc<ZoneMapIndex>
where
PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
{
let tmpdir = TempObjDir::default();
let test_store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let schema = Arc::new(Schema::new(vec![Field::new(
VALUE_COLUMN_NAME,
T::DATA_TYPE,
true,
)]));
let batches: Vec<RecordBatch> = fragments
.into_iter()
.map(|vals| {
RecordBatch::try_new(
schema.clone(),
vec![Arc::new(PrimitiveArray::<T>::from(vals))],
)
.unwrap()
})
.collect();
let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
schema.clone(),
stream::iter(batches.into_iter().map(Ok)),
));
let stream = add_row_addr(stream);

ZoneMapIndexPlugin::train_zonemap_index(
stream,
test_store.as_ref(),
Some(ZoneMapIndexBuilderParams::new(2)),
)
.await
.unwrap();

ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
.await
.expect("Failed to load ZoneMapIndex")
}

#[tokio::test]
async fn test_value_range_spans_fragments() {
// Two fragments, multiple zones each; global min/max straddle both.
let index = train_and_load::<Int64Type>(vec![
vec![Some(10), Some(50), Some(30)],
vec![Some(5), Some(99), Some(42)],
])
.await;
assert_eq!(
index.value_range(),
Some((ScalarValue::Int64(Some(5)), ScalarValue::Int64(Some(99))))
);
}

#[tokio::test]
async fn test_value_range_all_null_is_none() {
let index = train_and_load::<Int64Type>(vec![vec![None, None, None]]).await;
assert_eq!(index.value_range(), None);
}

#[tokio::test]
async fn test_value_range_nan_max_is_none() {
// Zones of size 2: [1.0, 2.0] then [100.0, NaN]. The NaN-bearing zone hides
// its finite max (100.0), so the only sound answer is None.
let index = train_and_load::<Float32Type>(vec![vec![
Some(1.0),
Some(2.0),
Some(100.0),
Some(f32::NAN),
]])
.await;
assert_eq!(index.value_range(), None);
}

#[tokio::test]
async fn test_value_range_over_folds_segments() {
// Two disjoint segments of one logical index; the global range straddles
// both (min and max from `b`), proving the fold spans segments.
let a = train_and_load::<Int64Type>(vec![vec![Some(5), Some(9)]]).await;
let b = train_and_load::<Int64Type>(vec![vec![Some(1), Some(20)]]).await;
assert_eq!(
ZoneMapIndex::value_range_over([a.as_ref(), b.as_ref()]),
Some((ScalarValue::Int64(Some(1)), ScalarValue::Int64(Some(20))))
);
}

#[tokio::test]
async fn test_value_range_over_nan_in_any_segment_is_none() {
// NaN in one segment hides that segment's finite max; the cross-segment
// fold must bail to None just as the single-segment path does.
let a = train_and_load::<Float32Type>(vec![vec![Some(1.0), Some(2.0)]]).await;
let b = train_and_load::<Float32Type>(vec![vec![Some(100.0), Some(f32::NAN)]]).await;
assert_eq!(
ZoneMapIndex::value_range_over([a.as_ref(), b.as_ref()]),
None
);
}

#[tokio::test]
async fn test_value_range_over_skips_all_null_segment() {
// An all-null segment yields no finite zone; folding it with a finite
// segment returns the finite segment's range (null contributes nothing).
let a = train_and_load::<Int64Type>(vec![vec![None, None]]).await;
let b = train_and_load::<Int64Type>(vec![vec![Some(3), Some(7)]]).await;
assert_eq!(
ZoneMapIndex::value_range_over([a.as_ref(), b.as_ref()]),
Some((ScalarValue::Int64(Some(3)), ScalarValue::Int64(Some(7))))
);
}

#[tokio::test]
async fn test_empty_zonemap_index() {
let tmpdir = TempObjDir::default();
Expand Down
64 changes: 64 additions & 0 deletions rust/lance/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::{Arc, OnceLock};
use arrow_schema::DataType;
use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::scalar::ScalarValue;
use futures::FutureExt;
use itertools::Itertools;
use lance_core::cache::CacheKey;
Expand All @@ -33,6 +34,7 @@ use lance_index::scalar::expression::{IndexInformationProvider, MultiQueryParser
use lance_index::scalar::inverted::{InvertedIndex, InvertedIndexPlugin};
use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::scalar::registry::{TrainingCriteria, TrainingOrdering};
use lance_index::scalar::zonemap::ZoneMapIndex;
use lance_index::scalar::{CreatedIndex, ScalarIndex};
use lance_index::vector::bq::builder::RabitQuantizer;
use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex, FlatQuantizer};
Expand Down Expand Up @@ -1453,6 +1455,68 @@ impl DatasetIndexExt for Dataset {
.await
}

async fn zonemap_value_range(
&self,
column: &str,
) -> Result<Option<(ScalarValue, ScalarValue)>> {
let Some(field) = self.schema().field(column) else {
return Err(Error::invalid_input(format!(
"zonemap_value_range: column '{column}' not found in dataset schema"
)));
};
let field_id = field.id;
let field_path = self.schema().field_path(field_id)?;

// A multi-segment ZoneMap is several index entries over the same column,
// each covering a disjoint fragment subset. Match the field, then the
// details type (the column may also carry e.g. a BTree).
let indices = self.load_indices().await?;
let segments: Vec<_> = indices
.iter()
.filter(|idx| matches!(idx.fields.as_slice(), [only] if *only == field_id))
.filter(|idx| {
idx.index_details
.as_ref()
.is_some_and(|d| d.type_url.ends_with("ZoneMapIndexDetails"))
})
.collect();
if segments.is_empty() {
return Ok(None);
}

// Soundness: the segments must *jointly* cover every live fragment, else
// the fold sees only a subset and could prune live rows (e.g. fragments
// appended after the index was built). Extra dead fragments are harmless.
let mut covered = RoaringBitmap::new();
for idx in &segments {
let Some(bitmap) = idx.fragment_bitmap.as_ref() else {
return Ok(None);
};
covered |= bitmap.clone();
}
if !self.fragment_bitmap.as_ref().is_subset(&covered) {
return Ok(None);
}

// Keep the opened indices alive so the `ZoneMapIndex` refs we fold over
// stay borrowed.
let mut opened = Vec::with_capacity(segments.len());
for idx in &segments {
opened.push(
self.open_generic_index(&field_path, &idx.uuid, &NoOpMetricsCollector)
.await?,
);
}
let Some(zonemaps) = opened
.iter()
.map(|index| index.as_any().downcast_ref::<ZoneMapIndex>())
.collect::<Option<Vec<_>>>()
else {
return Ok(None);
};
Ok(ZoneMapIndex::value_range_over(zonemaps))
}

async fn read_index_partition(
&self,
index_name: &str,
Expand Down
15 changes: 15 additions & 0 deletions rust/lance/src/index/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use datafusion::scalar::ScalarValue;
use lance_index::{IndexParams, IndexType, PrewarmOptions, optimize::OptimizeOptions};
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
Expand Down Expand Up @@ -233,6 +234,20 @@ pub trait DatasetIndexExt {
/// Find an index with the given name and return its serialized statistics.
async fn index_statistics(&self, index_name: &str) -> Result<String>;

/// Global `[min, max]` for `column` from its ZoneMap scalar index, no scan.
///
/// `None` unless the column's ZoneMap segments *jointly* cover every live
/// fragment and the column is NaN-free — fragments appended after the index
/// was built, or a NaN-bearing column, yield `None`. The disjoint segments
/// of a multi-segment index are folded together.
///
/// When `Some`, the range is a superset of live values, conservative under
/// deletion vectors: safe to prune with. See [`ZoneMapIndex::value_range`].
///
/// [`ZoneMapIndex::value_range`]: lance_index::scalar::zonemap::ZoneMapIndex::value_range
async fn zonemap_value_range(&self, column: &str)
-> Result<Option<(ScalarValue, ScalarValue)>>;

/// Merge one or more existing uncommitted index segments into a single uncommitted segment.
async fn merge_existing_index_segments(
&self,
Expand Down
Loading
Loading