From 43b6afb738b3d63344a00d8a199322cf0c29e2a2 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 23 Jun 2026 17:13:03 +0000 Subject: [PATCH 01/10] feat(index): write zone map seeds into data file footers during append MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces "index seeds" — compact per-fragment zone map statistics embedded as global buffers in data file footers at write time. During index update the seeds are harvested directly from the file footer, skipping the full column scan that would otherwise be required. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + rust/lance-index/Cargo.toml | 1 + rust/lance-index/src/scalar.rs | 12 + rust/lance-index/src/scalar/seed.rs | 49 +++ rust/lance-index/src/scalar/zonemap.rs | 398 ++++++++++++++++++++++++- rust/lance/src/dataset/write.rs | 170 ++++++++++- rust/lance/src/dataset/write/insert.rs | 74 +++++ rust/lance/src/index/append.rs | 127 +++++++- 8 files changed, 828 insertions(+), 4 deletions(-) create mode 100644 rust/lance-index/src/scalar/seed.rs diff --git a/Cargo.lock b/Cargo.lock index 11d2cbe555e..0ce087a00d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4772,6 +4772,7 @@ dependencies = [ "arrow", "arrow-arith", "arrow-array", + "arrow-ipc", "arrow-ord", "arrow-schema", "arrow-select", diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 85de43c0f9b..13d67f3449f 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -15,6 +15,7 @@ rust-version.workspace = true arc-swap.workspace = true arrow.workspace = true arrow-array.workspace = true +arrow-ipc.workspace = true arrow-ord.workspace = true arrow-schema.workspace = true arrow-select.workspace = true diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 448123a1024..99e51741a71 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -48,6 +48,7 @@ pub mod ngram; pub mod registry; #[cfg(feature = "geo")] pub mod rtree; +pub mod seed; pub mod zoned; pub mod zonemap; @@ -1132,6 +1133,17 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { /// This returns a ScalarIndexParams that can be used to recreate an index /// with the same configuration on another dataset. fn derive_index_params(&self) -> Result; + + /// Try to update the index using pre-harvested seed buffers instead of + /// re-scanning column data. Returns `None` if seeds are not supported or + /// the provided seeds cannot be used (e.g., wrong parameters). + async fn try_update_with_seeds( + &self, + _seeds: &[crate::scalar::seed::FragmentSeed], + _dest_store: &dyn IndexStore, + ) -> Result> { + Ok(None) + } } #[cfg(test)] diff --git a/rust/lance-index/src/scalar/seed.rs b/rust/lance-index/src/scalar/seed.rs new file mode 100644 index 00000000000..f76556ff43e --- /dev/null +++ b/rust/lance-index/src/scalar/seed.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Index seed writers — compact per-fragment summaries embedded in data files. +//! +//! A seed writer observes column values as they are written to a data file, +//! accumulates compact statistics in memory, and serializes them to a byte +//! buffer that is embedded in the data file footer as a global buffer. +//! +//! The buffer can later be read back during index updates to reconstruct index +//! statistics without re-scanning the column data. + +use arrow_array::ArrayRef; +use bytes::Bytes; +use lance_core::Result; + +/// A hook registered during data file writes that observes column values batch +/// by batch, accumulates compact statistics in memory, and serializes them to +/// a byte buffer that is embedded in the data file footer as a global buffer. +/// +/// The buffer can later be read back during index updates to reconstruct index +/// statistics without re-scanning the column data. +pub trait IndexSeedWriter: Send + Sync + std::fmt::Debug { + /// The column this writer is interested in. + fn column_name(&self) -> &str; + + /// Observe a slice of column values as they are written to the current fragment. + /// Called once per batch. Uses interior mutability. + fn observe_batch(&self, values: &ArrayRef) -> Result<()>; + + /// Serialize accumulated state to bytes and reset for the next fragment. + /// Returns `None` if no data was observed (empty fragment). + fn finish(&self) -> Result>; + + /// Schema metadata key used to record that a seed buffer was written. + /// Convention: `"lance.seed."`. + fn schema_metadata_key(&self) -> String; + + /// Schema metadata value given the 1-based buffer index assigned by the writer. + /// Should encode all information needed to re-read the seed (e.g. rows_per_zone). + fn schema_metadata_value(&self, buf_index: u32) -> String; +} + +/// A pre-harvested seed buffer from a single fragment's data file. +#[derive(Debug, Clone)] +pub struct FragmentSeed { + pub fragment_id: u64, + pub bytes: Bytes, +} diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index af5380cce30..26a97dc5504 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -18,6 +18,7 @@ use crate::scalar::expression::{SargableQueryParser, ScalarQueryParser}; use crate::scalar::registry::{ ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, }; +use crate::scalar::seed::IndexSeedWriter; use crate::scalar::{ BuiltinIndexType, CreatedIndex, IndexFile, SargableQuery, ScalarIndexParams, UpdateCriteria, compute_next_prefix, @@ -25,7 +26,7 @@ use crate::scalar::{ use lance_arrow_stats::StatisticsAccumulator; use lance_core::cache::{LanceCache, WeakLanceCache}; use serde::{Deserialize, Serialize}; -use std::sync::LazyLock; +use std::sync::{LazyLock, Mutex}; use arrow_array::{ ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array, new_null_array, @@ -53,7 +54,7 @@ const ZONEMAP_INDEX_VERSION: u32 = 0; /// Basic stats about zonemap index #[derive(Debug, PartialEq, Clone)] -struct ZoneMapStatistics { +pub(crate) struct ZoneMapStatistics { min: ScalarValue, max: ScalarValue, null_count: u32, @@ -131,6 +132,11 @@ impl DeepSizeOf for ZoneMapIndex { } impl ZoneMapIndex { + /// Returns the rows-per-zone parameter for this index. + pub fn rows_per_zone(&self) -> u64 { + self.rows_per_zone + } + fn scalar_is_nan(value: &ScalarValue) -> bool { match value { ScalarValue::Float16(Some(value)) => value.is_nan(), @@ -643,6 +649,37 @@ impl ScalarIndex for ZoneMapIndex { let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) } + + async fn try_update_with_seeds( + &self, + seeds: &[crate::scalar::seed::FragmentSeed], + dest_store: &dyn IndexStore, + ) -> Result> { + let mut new_zones = self.zones.clone(); + for seed in seeds { + let mut zones = ZoneMapSeedWriter::deserialize_seed( + seed.fragment_id, + &seed.bytes, + self.rows_per_zone, + )?; + new_zones.append(&mut zones); + } + new_zones.sort_by_key(|z| (z.bound.fragment_id, z.bound.start)); + + let mut builder = ZoneMapIndexBuilder::try_new( + ZoneMapIndexBuilderParams::new(self.rows_per_zone), + self.data_type.clone(), + )?; + builder.maps = new_zones; + let file = builder.write_index(dest_store).await?; + + Ok(Some(CreatedIndex { + index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) + .unwrap(), + index_version: ZONEMAP_INDEX_VERSION, + files: vec![file], + })) + } } /// Merge caller-selected ZoneMap segments into one self-contained segment. @@ -1037,6 +1074,267 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { } } +/// Metadata key prefix for zone map seed buffers. +pub const ZONEMAP_SEED_META_KEY_PREFIX: &str = "lance.seed."; + +struct ZoneMapSeedWriterInner { + completed_zones: Vec, + processor: ZoneMapProcessor, + rows_in_current_zone: u64, + next_zone_start: u64, +} + +/// A seed writer that observes column values during data file writes and +/// accumulates zone map statistics for later harvest during index updates. +/// +/// Zone statistics are serialized as Arrow IPC bytes and embedded in the data +/// file footer as a global buffer, keyed by `"lance.seed."`. +#[derive(Debug)] +pub struct ZoneMapSeedWriter { + column_name: String, + rows_per_zone: u64, + data_type: DataType, + inner: Mutex, +} + +impl ZoneMapSeedWriter { + /// Create a new `ZoneMapSeedWriter` for the given column. + pub fn new( + column_name: impl Into, + rows_per_zone: u64, + data_type: DataType, + ) -> Result { + if rows_per_zone == 0 { + return Err(lance_core::Error::invalid_input( + "rows_per_zone must be greater than zero", + )); + } + let processor = ZoneMapProcessor::new(data_type.clone())?; + Ok(Self { + column_name: column_name.into(), + rows_per_zone, + data_type, + inner: Mutex::new(ZoneMapSeedWriterInner { + completed_zones: Vec::new(), + processor, + rows_in_current_zone: 0, + next_zone_start: 0, + }), + }) + } + + fn seed_batch_from_zones( + zones: &[ZoneMapStatistics], + data_type: &DataType, + ) -> Result { + let mins = if zones.is_empty() { + arrow_array::new_empty_array(data_type) + } else { + datafusion_common::ScalarValue::iter_to_array(zones.iter().map(|s| s.min.clone()))? + }; + let maxs = if zones.is_empty() { + arrow_array::new_empty_array(data_type) + } else { + datafusion_common::ScalarValue::iter_to_array(zones.iter().map(|s| s.max.clone()))? + }; + let null_counts = + arrow_array::UInt32Array::from_iter_values(zones.iter().map(|s| s.null_count)); + let nan_counts = + arrow_array::UInt32Array::from_iter_values(zones.iter().map(|s| s.nan_count)); + + let schema = Arc::new(arrow_schema::Schema::new(vec![ + Field::new("min", data_type.clone(), true), + Field::new("max", data_type.clone(), true), + Field::new("null_count", DataType::UInt32, false), + Field::new("nan_count", DataType::UInt32, false), + ])); + + let columns: Vec = vec![ + mins, + maxs, + Arc::new(null_counts) as ArrayRef, + Arc::new(nan_counts) as ArrayRef, + ]; + Ok(arrow_array::RecordBatch::try_new(schema, columns)?) + } + + /// Deserialize zone map seed bytes (Arrow IPC) into zone statistics. + /// + /// Returns a list of `ZoneMapStatistics` with bounds reconstructed from + /// the sequential layout (zone `i` starts at `i * rows_per_zone`). + pub(crate) fn deserialize_seed( + fragment_id: u64, + bytes: &bytes::Bytes, + rows_per_zone: u64, + ) -> Result> { + use arrow_ipc::reader::FileReader; + use std::io::Cursor; + + let cursor = Cursor::new(bytes.as_ref()); + let mut reader = FileReader::try_new(cursor, None).map_err(|e| { + lance_core::Error::invalid_input(format!("failed to read zone map seed IPC: {}", e)) + })?; + + let batch = match reader.next() { + Some(Ok(batch)) => batch, + Some(Err(e)) => { + return Err(lance_core::Error::invalid_input(format!( + "failed to read zone map seed batch: {}", + e + ))); + } + None => return Ok(Vec::new()), + }; + + let min_col = batch + .column_by_name("min") + .ok_or_else(|| lance_core::Error::invalid_input("seed batch missing 'min' column"))?; + let max_col = batch + .column_by_name("max") + .ok_or_else(|| lance_core::Error::invalid_input("seed batch missing 'max' column"))?; + let null_count_col = batch + .column_by_name("null_count") + .ok_or_else(|| { + lance_core::Error::invalid_input("seed batch missing 'null_count' column") + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| lance_core::Error::invalid_input("seed 'null_count' is not UInt32"))?; + let nan_count_col = batch + .column_by_name("nan_count") + .ok_or_else(|| { + lance_core::Error::invalid_input("seed batch missing 'nan_count' column") + })? + .as_any() + .downcast_ref::() + .ok_or_else(|| lance_core::Error::invalid_input("seed 'nan_count' is not UInt32"))?; + + let num_zones = batch.num_rows(); + let mut zones = Vec::with_capacity(num_zones); + for i in 0..num_zones { + let zone_start = i as u64 * rows_per_zone; + // Last zone may be shorter; but for reconstruction we don't know the + // exact row count here so we use rows_per_zone as a sentinel. The + // actual length is only needed for zone-map lookups, not for appending. + let zone_length = rows_per_zone as usize; + zones.push(ZoneMapStatistics { + min: datafusion_common::ScalarValue::try_from_array(min_col, i)?, + max: datafusion_common::ScalarValue::try_from_array(max_col, i)?, + null_count: null_count_col.value(i), + nan_count: nan_count_col.value(i), + bound: ZoneBound { + fragment_id, + start: zone_start, + length: zone_length, + }, + }); + } + Ok(zones) + } +} + +impl std::fmt::Debug for ZoneMapSeedWriterInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ZoneMapSeedWriterInner") + .field("completed_zones", &self.completed_zones.len()) + .field("rows_in_current_zone", &self.rows_in_current_zone) + .field("next_zone_start", &self.next_zone_start) + .finish() + } +} + +impl IndexSeedWriter for ZoneMapSeedWriter { + fn column_name(&self) -> &str { + &self.column_name + } + + fn observe_batch(&self, values: &ArrayRef) -> lance_core::Result<()> { + let mut inner = self.inner.lock().unwrap(); + let mut offset = 0usize; + let total_rows = values.len() as u64; + + while offset < values.len() { + let remaining_in_zone = self.rows_per_zone - inner.rows_in_current_zone; + let chunk_len = ((values.len() as u64 - offset as u64).min(remaining_in_zone)) as usize; + let chunk = values.slice(offset, chunk_len); + inner.processor.process_chunk(&chunk)?; + inner.rows_in_current_zone += chunk_len as u64; + offset += chunk_len; + + if inner.rows_in_current_zone >= self.rows_per_zone { + let bound = ZoneBound { + fragment_id: 0, + start: inner.next_zone_start, + length: self.rows_per_zone as usize, + }; + let stats = inner.processor.finish_zone(bound)?; + inner.processor.reset()?; + inner.completed_zones.push(stats); + inner.next_zone_start += self.rows_per_zone; + inner.rows_in_current_zone = 0; + } + } + let _ = total_rows; // suppress unused warning + Ok(()) + } + + fn finish(&self) -> lance_core::Result> { + use arrow_ipc::writer::FileWriter; + use std::io::Cursor; + + let mut inner = self.inner.lock().unwrap(); + + // Flush partial final zone + if inner.rows_in_current_zone > 0 { + let bound = ZoneBound { + fragment_id: 0, + start: inner.next_zone_start, + length: inner.rows_in_current_zone as usize, + }; + let stats = inner.processor.finish_zone(bound)?; + inner.processor.reset()?; + inner.completed_zones.push(stats); + inner.next_zone_start += inner.rows_in_current_zone; + inner.rows_in_current_zone = 0; + } + + if inner.completed_zones.is_empty() { + return Ok(None); + } + + let batch = Self::seed_batch_from_zones(&inner.completed_zones, &self.data_type)?; + + // Serialize to Arrow IPC + let mut buf = Cursor::new(Vec::new()); + { + let mut writer = FileWriter::try_new(&mut buf, batch.schema_ref()).map_err(|e| { + lance_core::Error::invalid_input(format!("failed to create IPC writer: {}", e)) + })?; + writer.write(&batch).map_err(|e| { + lance_core::Error::invalid_input(format!("failed to write IPC batch: {}", e)) + })?; + writer.finish().map_err(|e| { + lance_core::Error::invalid_input(format!("failed to finish IPC writer: {}", e)) + })?; + } + + // Reset state for next fragment + inner.completed_zones.clear(); + inner.next_zone_start = 0; + inner.processor = ZoneMapProcessor::new(self.data_type.clone())?; + + Ok(Some(bytes::Bytes::from(buf.into_inner()))) + } + + fn schema_metadata_key(&self) -> String { + format!("{}{}", ZONEMAP_SEED_META_KEY_PREFIX, self.column_name) + } + + fn schema_metadata_value(&self, buf_index: u32) -> String { + format!("{}:{}", buf_index, self.rows_per_zone) + } +} + #[cfg(test)] mod tests { use crate::scalar::registry::VALUE_COLUMN_NAME; @@ -2629,4 +2927,100 @@ mod tests { // All max characters assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None); } + + #[tokio::test] + async fn test_zone_map_seed_writer_round_trip() { + use crate::scalar::seed::IndexSeedWriter; + use crate::scalar::zonemap::ZoneMapSeedWriter; + use arrow_array::{ArrayRef, Int32Array}; + use datafusion_common::ScalarValue; + + let rows_per_zone = 4u64; + let data_type = DataType::Int32; + let writer = ZoneMapSeedWriter::new("test_col", rows_per_zone, data_type).unwrap(); + + // Batch 1: values 0..4 (fills exactly one zone) + let batch1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..4)); + writer.observe_batch(&batch1).unwrap(); + + // Batch 2: values 10..14 (fills a second zone exactly) + let batch2: ArrayRef = Arc::new(Int32Array::from_iter_values(10..14)); + writer.observe_batch(&batch2).unwrap(); + + // Batch 3: values 20..22 (partial final zone) + let batch3: ArrayRef = Arc::new(Int32Array::from_iter_values(20..22)); + writer.observe_batch(&batch3).unwrap(); + + let bytes = writer.finish().unwrap().expect("should produce bytes"); + + // Check schema metadata key/value format + assert_eq!(writer.schema_metadata_key(), "lance.seed.test_col"); + let meta_val = writer.schema_metadata_value(3); + assert_eq!(meta_val, "3:4"); + + // Deserialize and verify + let zones = ZoneMapSeedWriter::deserialize_seed(42, &bytes, rows_per_zone).unwrap(); + assert_eq!(zones.len(), 3, "expected 3 zones"); + + // Zone 0: values 0..4 -> min=0, max=3 + assert_eq!(zones[0].bound.fragment_id, 42); + assert_eq!(zones[0].bound.start, 0); + assert_eq!(zones[0].min, ScalarValue::Int32(Some(0))); + assert_eq!(zones[0].max, ScalarValue::Int32(Some(3))); + assert_eq!(zones[0].null_count, 0); + + // Zone 1: values 10..14 -> min=10, max=13 + assert_eq!(zones[1].bound.start, 4); + assert_eq!(zones[1].min, ScalarValue::Int32(Some(10))); + assert_eq!(zones[1].max, ScalarValue::Int32(Some(13))); + + // Zone 2: values 20..22 -> min=20, max=21 + assert_eq!(zones[2].bound.start, 8); + assert_eq!(zones[2].min, ScalarValue::Int32(Some(20))); + assert_eq!(zones[2].max, ScalarValue::Int32(Some(21))); + } + + #[tokio::test] + async fn test_zone_map_seed_writer_spanning_batches() { + use crate::scalar::seed::IndexSeedWriter; + use crate::scalar::zonemap::ZoneMapSeedWriter; + use arrow_array::{ArrayRef, Int32Array}; + use datafusion_common::ScalarValue; + + let rows_per_zone = 5u64; + let data_type = DataType::Int32; + let writer = ZoneMapSeedWriter::new("val", rows_per_zone, data_type).unwrap(); + + // Single batch with 12 values -> should produce 2 complete zones + 1 partial + let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..12)); + writer.observe_batch(&batch).unwrap(); + + let bytes = writer.finish().unwrap().expect("should produce bytes"); + let zones = ZoneMapSeedWriter::deserialize_seed(1, &bytes, rows_per_zone).unwrap(); + assert_eq!( + zones.len(), + 3, + "expected 3 zones from 12 rows with zone size 5" + ); + + // Zone 0: rows 0..5 + assert_eq!(zones[0].min, ScalarValue::Int32(Some(0))); + assert_eq!(zones[0].max, ScalarValue::Int32(Some(4))); + // Zone 1: rows 5..10 + assert_eq!(zones[1].min, ScalarValue::Int32(Some(5))); + assert_eq!(zones[1].max, ScalarValue::Int32(Some(9))); + // Zone 2: rows 10..12 (partial) + assert_eq!(zones[2].min, ScalarValue::Int32(Some(10))); + assert_eq!(zones[2].max, ScalarValue::Int32(Some(11))); + } + + #[tokio::test] + async fn test_zone_map_seed_writer_empty() { + use crate::scalar::seed::IndexSeedWriter; + use crate::scalar::zonemap::ZoneMapSeedWriter; + + let writer = ZoneMapSeedWriter::new("col", 8, DataType::Int32).unwrap(); + let result = writer.finish().unwrap(); + assert!(result.is_none(), "empty fragment should return None"); + } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ff0a119158c..953783e8a1c 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use arrow_array::RecordBatch; +use bytes::Bytes; use chrono::TimeDelta; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; @@ -389,6 +390,10 @@ pub struct WriteParams { /// When a pack file reaches this size, a new one is started. /// If not set, defaults to 1 GiB. pub blob_pack_file_size_threshold: Option, + + /// Seed writers that observe column data during fragment writes and embed + /// compact index summaries as global buffers in the data file footer. + pub seed_writers: Vec>, } impl Default for WriteParams { @@ -418,6 +423,7 @@ impl Default for WriteParams { allow_external_blob_outside_bases: false, external_blob_mode: ExternalBlobMode::Reference, blob_pack_file_size_threshold: None, + seed_writers: Vec::new(), } } } @@ -630,6 +636,15 @@ pub async fn do_write_fragments( } writer.as_mut().unwrap().write(&batch_chunk).await?; + // Observe batch data for seed writers + for seed_writer in ¶ms.seed_writers { + let col_name = seed_writer.column_name(); + for batch in &batch_chunk { + if let Some(col) = batch.column_by_name(col_name) { + seed_writer.observe_batch(col)?; + } + } + } for batch in &batch_chunk { num_rows_in_current_file += batch.num_rows() as u32; } @@ -646,7 +661,9 @@ pub async fn do_write_fragments( if num_rows_in_current_file >= params.max_rows_per_file as u32 || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64 { - let (num_rows, data_file) = writer.take().unwrap().finish().await?; + let mut w = writer.take().unwrap(); + flush_seed_writers(w.as_mut(), ¶ms.seed_writers).await?; + let (num_rows, data_file) = w.finish().await?; info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); debug_assert_eq!(num_rows, num_rows_in_current_file); bytes_completed += data_file.file_size_bytes.get().map_or(0, |s| s.get()); @@ -682,6 +699,11 @@ pub async fn do_write_fragments( // Complete the final writer if let Some(mut writer) = writer.take() { + if let Err(e) = flush_seed_writers(writer.as_mut(), ¶ms.seed_writers).await { + drop(writer); + cleanup_data_fragments(&object_store, base_dir, &fragments).await; + return Err(e); + } match writer.finish().await { Ok((num_rows, data_file)) => { info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); @@ -710,6 +732,23 @@ pub async fn do_write_fragments( Ok(fragments) } +/// Flush all seed writers into the given file writer, embedding seed buffers +/// and schema metadata before `finish()` is called. +async fn flush_seed_writers( + writer: &mut dyn GenericWriter, + seed_writers: &[Arc], +) -> Result<()> { + for seed_writer in seed_writers { + if let Some(bytes) = seed_writer.finish()? { + let buf_index = writer.add_global_buffer(bytes).await?; + let key = seed_writer.schema_metadata_key(); + let value = seed_writer.schema_metadata_value(buf_index); + writer.add_schema_metadata(key, value); + } + } + Ok(()) +} + /// Best-effort cleanup of data files for fragments that were written but not committed. /// /// Contract: @@ -1116,6 +1155,16 @@ pub trait GenericWriter: Send { async fn tell(&mut self) -> Result; /// Finish writing the file (flush the remaining data and write footer) async fn finish(&mut self) -> Result<(u32, DataFile)>; + + /// Add a global buffer to the current file. Returns the 1-based buffer index. + /// Must be called before `finish`. No-op on legacy (V1) files (returns `Ok(1)`). + async fn add_global_buffer(&mut self, _buffer: Bytes) -> Result { + Ok(1) + } + + /// Add a key-value pair to the file's schema metadata. + /// Must be called before `finish`. No-op on legacy (V1) files. + fn add_schema_metadata(&mut self, _key: String, _value: String) {} } struct V1WriterAdapter @@ -1213,6 +1262,14 @@ impl GenericWriter for V2WriterAdapter { ); Ok((write_summary.num_rows as u32, data_file)) } + + async fn add_global_buffer(&mut self, buffer: Bytes) -> Result { + self.writer.add_global_buffer(buffer).await + } + + fn add_schema_metadata(&mut self, key: String, value: String) { + self.writer.add_schema_metadata(key, value); + } } pub async fn open_writer( @@ -3579,4 +3636,115 @@ mod tests { "Local data file should be deleted by cleanup" ); } + + #[tokio::test] + async fn test_zone_map_seeds_used_during_update() { + use crate::Dataset; + use crate::index::DatasetIndexExt; + use crate::index::scalar::open_scalar_index; + use arrow::datatypes::Int32Type; + use lance_datagen::{BatchCount, RowCount}; + use lance_datagen::{array, gen_batch}; + use lance_file::reader::FileReaderOptions; + use lance_index::metrics::NoOpMetricsCollector; + use lance_index::scalar::zonemap::ZONEMAP_SEED_META_KEY_PREFIX; + use lance_index::{IndexType, scalar::ScalarIndexParams}; + use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; + use lance_io::utils::CachedFileSize; + + let tmpdir = lance_core::utils::tempfile::TempStrDir::default(); + let uri = tmpdir.as_str(); + + // Step 1: Create initial dataset + let reader = gen_batch() + .col("val", array::step::()) + .into_reader_rows(RowCount::from(100), BatchCount::from(1)); + let mut dataset = Dataset::write(reader, uri, None).await.unwrap(); + + // Step 2: Create a zone map index + let params = ScalarIndexParams::for_builtin(lance_index::scalar::BuiltinIndexType::ZoneMap); + dataset + .create_index(&["val"], IndexType::ZoneMap, None, ¶ms, false) + .await + .unwrap(); + // Step 3: Append new data - seeds should be written automatically + let reader = gen_batch() + .col("val", array::step::()) + .into_reader_rows(RowCount::from(50), BatchCount::from(1)); + let dataset = Dataset::write( + reader, + uri, + Some(WriteParams { + mode: WriteMode::Append, + data_storage_version: Some(lance_file::version::LanceFileVersion::V2_1), + ..Default::default() + }), + ) + .await + .unwrap(); + + // Step 4: Verify that the newly appended fragment has a seed embedded + let fragments = dataset.fragments(); + let new_fragment = fragments.last().unwrap(); + let data_file = new_fragment.files.first().unwrap(); + + let scheduler = ScanScheduler::new( + dataset.object_store.clone(), + SchedulerConfig::max_bandwidth(&dataset.object_store), + ); + let path = dataset + .base + .clone() + .join(super::DATA_DIR) + .join(data_file.path.as_str()); + let file_scheduler = scheduler + .open_file(&path, &CachedFileSize::unknown()) + .await + .unwrap(); + let reader = lance_file::reader::FileReader::try_open( + file_scheduler, + None, + Default::default(), + &dataset.metadata_cache.file_metadata_cache(&path), + FileReaderOptions::default(), + ) + .await + .unwrap(); + + let meta_key = format!("{}val", ZONEMAP_SEED_META_KEY_PREFIX); + let has_seed = reader + .metadata() + .file_schema + .metadata + .contains_key(&meta_key); + assert!( + has_seed, + "Newly appended fragment should have a zone map seed in metadata" + ); + + // Step 5: Optimize the index (should use seeds) + let mut dataset = Dataset::open(uri).await.unwrap(); + dataset.optimize_indices(&Default::default()).await.unwrap(); + + // Step 6: Query the updated index to verify it's correct + let dataset = Dataset::open(uri).await.unwrap(); + let indices = dataset.load_indices().await.unwrap(); + assert!( + !indices.is_empty(), + "Dataset should still have an index after optimization" + ); + + // Verify the index is a ZoneMap and covers all fragments + let index = indices.iter().find(|i| i.name.contains("val")).unwrap(); + let scalar_index = open_scalar_index(&dataset, "val", index, &NoOpMetricsCollector) + .await + .unwrap(); + assert_eq!( + scalar_index.index_type(), + IndexType::ZoneMap, + "Index should still be a ZoneMap after optimization" + ); + let frags = scalar_index.calculate_included_frags().await.unwrap(); + assert_eq!(frags.len(), 2, "Index should cover both fragments"); + } } diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index bfd702c9c3b..636b8fee1ed 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -12,6 +12,7 @@ use lance_core::utils::tracing::{DATASET_WRITING_EVENT, TRACE_DATASET_EVENTS}; use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; use lance_datafusion::utils::StreamingWriteSource; use lance_file::version::LanceFileVersion; +use lance_index::scalar::seed::IndexSeedWriter; use lance_io::object_store::ObjectStore; use lance_table::feature_flags::can_write_dataset; use lance_table::format::Fragment; @@ -23,6 +24,7 @@ use crate::dataset::ReadParams; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; +use crate::index::DatasetIndexExt; use crate::{Error, Result}; use tracing::info; @@ -200,6 +202,16 @@ impl<'a> InsertBuilder<'a> { self.validate_write(&mut context, &schema)?; + // Auto-configure zone map seed writers for append/create operations + if matches!( + context.params.mode, + super::WriteMode::Append | super::WriteMode::Create + ) && let Some(dataset) = context.dest.dataset() + { + let seed_writers = create_zone_map_seed_writers(dataset).await?; + context.params.seed_writers.extend(seed_writers); + } + let existing_base_paths = context.dest.dataset().map(|ds| &ds.manifest.base_paths); let target_base_info = validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; @@ -428,6 +440,68 @@ impl<'a> InsertBuilder<'a> { } } +/// Create zone map seed writers for all zone-map-indexed columns in the dataset. +/// +/// Returns an empty vector if the dataset has no zone map indices. +async fn create_zone_map_seed_writers(dataset: &Dataset) -> Result>> { + use lance_index::metrics::NoOpMetricsCollector; + use lance_index::scalar::zonemap::ZoneMapIndex; + + let indices = dataset.load_indices().await?; + let mut writers: Vec> = Vec::new(); + + for index in indices.iter() { + // Only process scalar (non-vector) indices + if index.fields.len() != 1 { + continue; + } + let field_id = index.fields[0]; + let Ok(field_path) = dataset.schema().field_path(field_id) else { + continue; + }; + + // Try to open as a scalar index and downcast to ZoneMapIndex + let Ok(scalar_index) = crate::index::scalar::open_scalar_index( + dataset, + &field_path, + index, + &NoOpMetricsCollector, + ) + .await + else { + continue; + }; + + let Some(zone_map) = scalar_index.as_any().downcast_ref::() else { + continue; + }; + + let rows_per_zone = zone_map.rows_per_zone(); + // Get the column data type from the dataset schema + let Some(data_type) = dataset.schema().field(&field_path).map(|f| f.data_type()) else { + continue; + }; + + // Skip nested types - zone maps don't support them + if data_type.is_nested() { + continue; + } + + match lance_index::scalar::zonemap::ZoneMapSeedWriter::new( + field_path.clone(), + rows_per_zone, + data_type, + ) { + Ok(seed_writer) => { + writers.push(Arc::new(seed_writer)); + } + Err(_) => continue, + } + } + + Ok(writers) +} + #[derive(Debug)] struct WriteContext<'a> { params: WriteParams, diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index d3ecde030c1..df01f8c181f 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::{FutureExt, TryStreamExt}; use lance_core::{Error, Result}; +use lance_file::reader::FileReaderOptions; use lance_index::{ INDEX_FILE_NAME, IndexType, metrics::NoOpMetricsCollector, @@ -12,9 +13,11 @@ use lance_index::{ progress::NoopIndexBuildProgress, scalar::{ CreatedIndex, OldIndexDataFilter, ScalarIndex, inverted::InvertedIndex, - lance_format::LanceIndexStore, + lance_format::LanceIndexStore, seed::FragmentSeed, zonemap::ZONEMAP_SEED_META_KEY_PREFIX, }, }; +use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; +use lance_io::utils::CachedFileSize; use lance_select::{RowAddrTreeMap, RowSetOps}; use lance_table::format::{Fragment, IndexMetadata}; use roaring::RoaringBitmap; @@ -175,6 +178,93 @@ pub async fn build_per_segment_filters( Ok((effective_union, filters)) } +/// Attempt to read zone map seeds for `fragments` from their data files. +/// +/// Returns `Some(vec)` if ALL fragments have a valid seed for `column_name` +/// with matching `rows_per_zone`; returns `None` if any fragment is missing a seed +/// or the seed parameters don't match. +async fn try_harvest_zonemap_seeds( + dataset: &Dataset, + fragments: &[Fragment], + column_name: &str, + rows_per_zone: u64, +) -> Result>> { + if fragments.is_empty() { + return Ok(Some(Vec::new())); + } + + let meta_key = format!("{}{}", ZONEMAP_SEED_META_KEY_PREFIX, column_name); + let mut seeds = Vec::with_capacity(fragments.len()); + + let scheduler = ScanScheduler::new( + dataset.object_store.clone(), + SchedulerConfig::max_bandwidth(&dataset.object_store), + ); + + for fragment in fragments { + // Find the primary data file (first file) + let Some(data_file) = fragment.files.first() else { + return Ok(None); + }; + + let path = dataset + .base + .clone() + .join(crate::dataset::DATA_DIR) + .join(data_file.path.as_str()); + let Ok(file_scheduler) = scheduler.open_file(&path, &CachedFileSize::unknown()).await + else { + return Ok(None); + }; + + let Ok(reader) = lance_file::reader::FileReader::try_open( + file_scheduler, + None, + Default::default(), + &dataset.metadata_cache.file_metadata_cache(&path), + FileReaderOptions::default(), + ) + .await + else { + return Ok(None); + }; + + // Look for the seed metadata key + let Some(meta_value) = reader.metadata().file_schema.metadata.get(&meta_key).cloned() + else { + return Ok(None); + }; + + // Parse "{buf_index}:{rows_per_zone_from_file}" + let parts: Vec<&str> = meta_value.splitn(2, ':').collect(); + if parts.len() != 2 { + return Ok(None); + } + let Ok(buf_index) = parts[0].parse::() else { + return Ok(None); + }; + let Ok(rows_per_zone_from_file) = parts[1].parse::() else { + return Ok(None); + }; + + if rows_per_zone_from_file != rows_per_zone { + return Ok(None); + } + + // Read the global buffer (the index returned by add_global_buffer is used directly) + let Ok(bytes) = reader.read_global_buffer(buf_index).await else { + return Ok(None); + }; + + seeds.push(FragmentSeed { + fragment_id: fragment.id, + bytes, + }); + } + + Ok(Some(seeds)) +} + async fn load_unindexed_training_data( dataset: &Dataset, field_path: &str, @@ -375,6 +465,41 @@ async fn merge_scalar_indices<'a>( ) .await? } + IndexType::ZoneMap => { + // Try to accelerate the update using pre-harvested seed buffers. + let zone_map = reference_index + .as_any() + .downcast_ref::(); + let maybe_created = if let Some(zone_map) = zone_map { + let rpz = zone_map.rows_per_zone(); + match try_harvest_zonemap_seeds(dataset.as_ref(), unindexed, column_name, rpz) + .await? + { + Some(seeds) => { + reference_index + .try_update_with_seeds(&seeds, &new_store) + .await? + } + None => None, + } + } else { + None + }; + + if let Some(created) = maybe_created { + created + } else { + let old_data_filter = build_old_data_filter( + dataset.as_ref(), + &effective_old_frags, + &deleted_old_frags, + ) + .await?; + reference_index + .update(new_data_stream, &new_store, old_data_filter) + .await? + } + } // NOTE: IndexType::Inverted never reaches here -- it is handled by the // dedicated arm in merge_indices_with_unindexed_frags before this // function is called. From a54ecc723fedc86396dfb79819285e83e10d771a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 24 Jun 2026 13:36:45 +0000 Subject: [PATCH 02/10] refactor(index): remove seed_writers from WriteParams; use &mut self on IndexSeedWriter Move seed writer creation out of WriteParams (a config struct shouldn't hold mutable stateful objects) into write_fragments_internal, passing them as an explicit parameter to do_write_fragments. Switch IndexSeedWriter trait methods to &mut self, eliminating the need for Arc + Mutex. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-index/src/scalar/seed.rs | 8 +- rust/lance-index/src/scalar/zonemap.rs | 96 ++++++++++-------------- rust/lance/src/dataset/fragment/write.rs | 1 + rust/lance/src/dataset/write.rs | 96 ++++++++++++++++++++---- rust/lance/src/dataset/write/insert.rs | 74 ------------------ rust/lance/src/index/append.rs | 7 +- 6 files changed, 132 insertions(+), 150 deletions(-) diff --git a/rust/lance-index/src/scalar/seed.rs b/rust/lance-index/src/scalar/seed.rs index f76556ff43e..29b64d651d1 100644 --- a/rust/lance-index/src/scalar/seed.rs +++ b/rust/lance-index/src/scalar/seed.rs @@ -20,17 +20,17 @@ use lance_core::Result; /// /// The buffer can later be read back during index updates to reconstruct index /// statistics without re-scanning the column data. -pub trait IndexSeedWriter: Send + Sync + std::fmt::Debug { +pub trait IndexSeedWriter: Send + std::fmt::Debug { /// The column this writer is interested in. fn column_name(&self) -> &str; /// Observe a slice of column values as they are written to the current fragment. - /// Called once per batch. Uses interior mutability. - fn observe_batch(&self, values: &ArrayRef) -> Result<()>; + /// Called once per batch. + fn observe_batch(&mut self, values: &ArrayRef) -> Result<()>; /// Serialize accumulated state to bytes and reset for the next fragment. /// Returns `None` if no data was observed (empty fragment). - fn finish(&self) -> Result>; + fn finish(&mut self) -> Result>; /// Schema metadata key used to record that a seed buffer was written. /// Convention: `"lance.seed."`. diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 26a97dc5504..8cdceab737b 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -26,7 +26,7 @@ use crate::scalar::{ use lance_arrow_stats::StatisticsAccumulator; use lance_core::cache::{LanceCache, WeakLanceCache}; use serde::{Deserialize, Serialize}; -use std::sync::{LazyLock, Mutex}; +use std::sync::LazyLock; use arrow_array::{ ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array, new_null_array, @@ -863,6 +863,7 @@ impl ZoneMapIndexBuilder { /// Index-specific processor that computes min/max statistics for each zone while the /// trainer takes care of chunking and fragment boundaries. +#[derive(Debug)] struct ZoneMapProcessor { data_type: DataType, statistics: StatisticsAccumulator, @@ -1077,13 +1078,6 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { /// Metadata key prefix for zone map seed buffers. pub const ZONEMAP_SEED_META_KEY_PREFIX: &str = "lance.seed."; -struct ZoneMapSeedWriterInner { - completed_zones: Vec, - processor: ZoneMapProcessor, - rows_in_current_zone: u64, - next_zone_start: u64, -} - /// A seed writer that observes column values during data file writes and /// accumulates zone map statistics for later harvest during index updates. /// @@ -1094,7 +1088,10 @@ pub struct ZoneMapSeedWriter { column_name: String, rows_per_zone: u64, data_type: DataType, - inner: Mutex, + completed_zones: Vec, + processor: ZoneMapProcessor, + rows_in_current_zone: u64, + next_zone_start: u64, } impl ZoneMapSeedWriter { @@ -1114,12 +1111,10 @@ impl ZoneMapSeedWriter { column_name: column_name.into(), rows_per_zone, data_type, - inner: Mutex::new(ZoneMapSeedWriterInner { - completed_zones: Vec::new(), - processor, - rows_in_current_zone: 0, - next_zone_start: 0, - }), + completed_zones: Vec::new(), + processor, + rows_in_current_zone: 0, + next_zone_start: 0, }) } @@ -1233,76 +1228,61 @@ impl ZoneMapSeedWriter { } } -impl std::fmt::Debug for ZoneMapSeedWriterInner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ZoneMapSeedWriterInner") - .field("completed_zones", &self.completed_zones.len()) - .field("rows_in_current_zone", &self.rows_in_current_zone) - .field("next_zone_start", &self.next_zone_start) - .finish() - } -} - impl IndexSeedWriter for ZoneMapSeedWriter { fn column_name(&self) -> &str { &self.column_name } - fn observe_batch(&self, values: &ArrayRef) -> lance_core::Result<()> { - let mut inner = self.inner.lock().unwrap(); + fn observe_batch(&mut self, values: &ArrayRef) -> lance_core::Result<()> { let mut offset = 0usize; - let total_rows = values.len() as u64; while offset < values.len() { - let remaining_in_zone = self.rows_per_zone - inner.rows_in_current_zone; + let remaining_in_zone = self.rows_per_zone - self.rows_in_current_zone; let chunk_len = ((values.len() as u64 - offset as u64).min(remaining_in_zone)) as usize; let chunk = values.slice(offset, chunk_len); - inner.processor.process_chunk(&chunk)?; - inner.rows_in_current_zone += chunk_len as u64; + self.processor.process_chunk(&chunk)?; + self.rows_in_current_zone += chunk_len as u64; offset += chunk_len; - if inner.rows_in_current_zone >= self.rows_per_zone { + if self.rows_in_current_zone >= self.rows_per_zone { let bound = ZoneBound { fragment_id: 0, - start: inner.next_zone_start, + start: self.next_zone_start, length: self.rows_per_zone as usize, }; - let stats = inner.processor.finish_zone(bound)?; - inner.processor.reset()?; - inner.completed_zones.push(stats); - inner.next_zone_start += self.rows_per_zone; - inner.rows_in_current_zone = 0; + let stats = self.processor.finish_zone(bound)?; + self.processor.reset()?; + self.completed_zones.push(stats); + self.next_zone_start += self.rows_per_zone; + self.rows_in_current_zone = 0; } } - let _ = total_rows; // suppress unused warning Ok(()) } - fn finish(&self) -> lance_core::Result> { + fn finish(&mut self) -> lance_core::Result> { use arrow_ipc::writer::FileWriter; use std::io::Cursor; - let mut inner = self.inner.lock().unwrap(); - // Flush partial final zone - if inner.rows_in_current_zone > 0 { + if self.rows_in_current_zone > 0 { let bound = ZoneBound { fragment_id: 0, - start: inner.next_zone_start, - length: inner.rows_in_current_zone as usize, + start: self.next_zone_start, + length: self.rows_in_current_zone as usize, }; - let stats = inner.processor.finish_zone(bound)?; - inner.processor.reset()?; - inner.completed_zones.push(stats); - inner.next_zone_start += inner.rows_in_current_zone; - inner.rows_in_current_zone = 0; + let stats = self.processor.finish_zone(bound)?; + self.processor.reset()?; + self.completed_zones.push(stats); + self.next_zone_start += self.rows_in_current_zone; + self.rows_in_current_zone = 0; } - if inner.completed_zones.is_empty() { + if self.completed_zones.is_empty() { return Ok(None); } - let batch = Self::seed_batch_from_zones(&inner.completed_zones, &self.data_type)?; + let batch = Self::seed_batch_from_zones(&self.completed_zones, &self.data_type)?; // Serialize to Arrow IPC let mut buf = Cursor::new(Vec::new()); @@ -1319,9 +1299,9 @@ impl IndexSeedWriter for ZoneMapSeedWriter { } // Reset state for next fragment - inner.completed_zones.clear(); - inner.next_zone_start = 0; - inner.processor = ZoneMapProcessor::new(self.data_type.clone())?; + self.completed_zones.clear(); + self.next_zone_start = 0; + self.processor = ZoneMapProcessor::new(self.data_type.clone())?; Ok(Some(bytes::Bytes::from(buf.into_inner()))) } @@ -2937,7 +2917,7 @@ mod tests { let rows_per_zone = 4u64; let data_type = DataType::Int32; - let writer = ZoneMapSeedWriter::new("test_col", rows_per_zone, data_type).unwrap(); + let mut writer = ZoneMapSeedWriter::new("test_col", rows_per_zone, data_type).unwrap(); // Batch 1: values 0..4 (fills exactly one zone) let batch1: ArrayRef = Arc::new(Int32Array::from_iter_values(0..4)); @@ -2989,7 +2969,7 @@ mod tests { let rows_per_zone = 5u64; let data_type = DataType::Int32; - let writer = ZoneMapSeedWriter::new("val", rows_per_zone, data_type).unwrap(); + let mut writer = ZoneMapSeedWriter::new("val", rows_per_zone, data_type).unwrap(); // Single batch with 12 values -> should produce 2 complete zones + 1 partial let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..12)); @@ -3019,7 +2999,7 @@ mod tests { use crate::scalar::seed::IndexSeedWriter; use crate::scalar::zonemap::ZoneMapSeedWriter; - let writer = ZoneMapSeedWriter::new("col", 8, DataType::Int32).unwrap(); + let mut writer = ZoneMapSeedWriter::new("col", 8, DataType::Int32).unwrap(); let result = writer.finish().unwrap(); assert!(result.is_none(), "empty fragment should return None"); } diff --git a/rust/lance/src/dataset/fragment/write.rs b/rust/lance/src/dataset/fragment/write.rs index 9731be0c0eb..1f2b00e7630 100644 --- a/rust/lance/src/dataset/fragment/write.rs +++ b/rust/lance/src/dataset/fragment/write.rs @@ -235,6 +235,7 @@ impl<'a> FragmentCreateBuilder<'a> { params, version, target_bases_info, + Vec::new(), ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 953783e8a1c..ccc490b3a56 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -390,10 +390,6 @@ pub struct WriteParams { /// When a pack file reaches this size, a new one is started. /// If not set, defaults to 1 GiB. pub blob_pack_file_size_threshold: Option, - - /// Seed writers that observe column data during fragment writes and embed - /// compact index summaries as global buffers in the data file footer. - pub seed_writers: Vec>, } impl Default for WriteParams { @@ -423,7 +419,6 @@ impl Default for WriteParams { allow_external_blob_outside_bases: false, external_blob_mode: ExternalBlobMode::Reference, blob_pack_file_size_threshold: None, - seed_writers: Vec::new(), } } } @@ -573,6 +568,7 @@ pub async fn do_write_fragments( params: WriteParams, storage_version: LanceFileVersion, target_bases_info: Option>, + mut seed_writers: Vec>, ) -> Result> { let adapter = SchemaAdapter::new(data.schema()); let data = adapter.to_physical_stream(data); @@ -636,11 +632,10 @@ pub async fn do_write_fragments( } writer.as_mut().unwrap().write(&batch_chunk).await?; - // Observe batch data for seed writers - for seed_writer in ¶ms.seed_writers { - let col_name = seed_writer.column_name(); + for seed_writer in seed_writers.iter_mut() { + let col_name = seed_writer.column_name().to_owned(); for batch in &batch_chunk { - if let Some(col) = batch.column_by_name(col_name) { + if let Some(col) = batch.column_by_name(&col_name) { seed_writer.observe_batch(col)?; } } @@ -662,7 +657,7 @@ pub async fn do_write_fragments( || writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64 { let mut w = writer.take().unwrap(); - flush_seed_writers(w.as_mut(), ¶ms.seed_writers).await?; + flush_seed_writers(w.as_mut(), &mut seed_writers).await?; let (num_rows, data_file) = w.finish().await?; info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DATA, path = &data_file.path); debug_assert_eq!(num_rows, num_rows_in_current_file); @@ -699,7 +694,7 @@ pub async fn do_write_fragments( // Complete the final writer if let Some(mut writer) = writer.take() { - if let Err(e) = flush_seed_writers(writer.as_mut(), ¶ms.seed_writers).await { + if let Err(e) = flush_seed_writers(writer.as_mut(), &mut seed_writers).await { drop(writer); cleanup_data_fragments(&object_store, base_dir, &fragments).await; return Err(e); @@ -736,9 +731,9 @@ pub async fn do_write_fragments( /// and schema metadata before `finish()` is called. async fn flush_seed_writers( writer: &mut dyn GenericWriter, - seed_writers: &[Arc], + seed_writers: &mut [Box], ) -> Result<()> { - for seed_writer in seed_writers { + for seed_writer in seed_writers.iter_mut() { if let Some(bytes) = seed_writer.finish()? { let buf_index = writer.add_global_buffer(bytes).await?; let key = seed_writer.schema_metadata_key(); @@ -1116,6 +1111,8 @@ pub async fn write_fragments_internal( ))); } + let seed_writers = create_zone_map_seed_writers(dataset, ¶ms, storage_version).await?; + let fragments = do_write_fragments( dataset, object_store, @@ -1125,12 +1122,83 @@ pub async fn write_fragments_internal( params, storage_version, target_bases_info, + seed_writers, ) .await?; Ok((fragments, schema)) } +async fn create_zone_map_seed_writers( + dataset: Option<&Dataset>, + params: &WriteParams, + storage_version: LanceFileVersion, +) -> Result>> { + use crate::index::DatasetIndexExt; + use lance_index::metrics::NoOpMetricsCollector; + use lance_index::scalar::zonemap::ZoneMapIndex; + use lance_table::format::IndexMetadata; + + // Seeds only make sense when appending to an existing dataset with V2 files. + if storage_version == LanceFileVersion::Legacy { + return Ok(Vec::new()); + } + if !matches!(params.mode, WriteMode::Append) { + return Ok(Vec::new()); + } + let Some(dataset) = dataset else { + return Ok(Vec::new()); + }; + + let indices: Arc> = dataset.load_indices().await?; + let mut writers: Vec> = Vec::new(); + + for index in indices.iter() { + if index.fields.len() != 1 { + continue; + } + let field_id = index.fields[0]; + let Ok(field_path) = dataset.schema().field_path(field_id) else { + continue; + }; + + let Ok(scalar_index) = crate::index::scalar::open_scalar_index( + dataset, + &field_path, + index, + &NoOpMetricsCollector, + ) + .await + else { + continue; + }; + + let Some(zone_map) = scalar_index.as_any().downcast_ref::() else { + continue; + }; + + let rows_per_zone = zone_map.rows_per_zone(); + let Some(data_type) = dataset.schema().field(&field_path).map(|f| f.data_type()) else { + continue; + }; + + if data_type.is_nested() { + continue; + } + + match lance_index::scalar::zonemap::ZoneMapSeedWriter::new( + field_path.clone(), + rows_per_zone, + data_type, + ) { + Ok(seed_writer) => writers.push(Box::new(seed_writer)), + Err(_) => continue, + } + } + + Ok(writers) +} + fn legacy_blob_field_path(schema: &Schema) -> Option { schema .fields_pre_order() @@ -3519,6 +3587,7 @@ mod tests { WriteParams::default(), LanceFileVersion::V2_1, None, + Vec::new(), ) .await; @@ -3579,6 +3648,7 @@ mod tests { }, LanceFileVersion::V2_1, None, + Vec::new(), ) .await; diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index 636b8fee1ed..bfd702c9c3b 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -12,7 +12,6 @@ use lance_core::utils::tracing::{DATASET_WRITING_EVENT, TRACE_DATASET_EVENTS}; use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; use lance_datafusion::utils::StreamingWriteSource; use lance_file::version::LanceFileVersion; -use lance_index::scalar::seed::IndexSeedWriter; use lance_io::object_store::ObjectStore; use lance_table::feature_flags::can_write_dataset; use lance_table::format::Fragment; @@ -24,7 +23,6 @@ use crate::dataset::ReadParams; use crate::dataset::builder::DatasetBuilder; use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; -use crate::index::DatasetIndexExt; use crate::{Error, Result}; use tracing::info; @@ -202,16 +200,6 @@ impl<'a> InsertBuilder<'a> { self.validate_write(&mut context, &schema)?; - // Auto-configure zone map seed writers for append/create operations - if matches!( - context.params.mode, - super::WriteMode::Append | super::WriteMode::Create - ) && let Some(dataset) = context.dest.dataset() - { - let seed_writers = create_zone_map_seed_writers(dataset).await?; - context.params.seed_writers.extend(seed_writers); - } - let existing_base_paths = context.dest.dataset().map(|ds| &ds.manifest.base_paths); let target_base_info = validate_and_resolve_target_bases(&mut context.params, existing_base_paths).await?; @@ -440,68 +428,6 @@ impl<'a> InsertBuilder<'a> { } } -/// Create zone map seed writers for all zone-map-indexed columns in the dataset. -/// -/// Returns an empty vector if the dataset has no zone map indices. -async fn create_zone_map_seed_writers(dataset: &Dataset) -> Result>> { - use lance_index::metrics::NoOpMetricsCollector; - use lance_index::scalar::zonemap::ZoneMapIndex; - - let indices = dataset.load_indices().await?; - let mut writers: Vec> = Vec::new(); - - for index in indices.iter() { - // Only process scalar (non-vector) indices - if index.fields.len() != 1 { - continue; - } - let field_id = index.fields[0]; - let Ok(field_path) = dataset.schema().field_path(field_id) else { - continue; - }; - - // Try to open as a scalar index and downcast to ZoneMapIndex - let Ok(scalar_index) = crate::index::scalar::open_scalar_index( - dataset, - &field_path, - index, - &NoOpMetricsCollector, - ) - .await - else { - continue; - }; - - let Some(zone_map) = scalar_index.as_any().downcast_ref::() else { - continue; - }; - - let rows_per_zone = zone_map.rows_per_zone(); - // Get the column data type from the dataset schema - let Some(data_type) = dataset.schema().field(&field_path).map(|f| f.data_type()) else { - continue; - }; - - // Skip nested types - zone maps don't support them - if data_type.is_nested() { - continue; - } - - match lance_index::scalar::zonemap::ZoneMapSeedWriter::new( - field_path.clone(), - rows_per_zone, - data_type, - ) { - Ok(seed_writer) => { - writers.push(Arc::new(seed_writer)); - } - Err(_) => continue, - } - } - - Ok(writers) -} - #[derive(Debug)] struct WriteContext<'a> { params: WriteParams, diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index df01f8c181f..e107fd49d67 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -230,7 +230,12 @@ async fn try_harvest_zonemap_seeds( }; // Look for the seed metadata key - let Some(meta_value) = reader.metadata().file_schema.metadata.get(&meta_key).cloned() + let Some(meta_value) = reader + .metadata() + .file_schema + .metadata + .get(&meta_key) + .cloned() else { return Ok(None); }; From 7b032cb87c7401e70593f75dee1e9661ec9ff36e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 24 Jun 2026 15:35:51 +0000 Subject: [PATCH 03/10] refactor(index): make seed writer creation a generic ScalarIndexPlugin capability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `create_seed_writer` to `ScalarIndexPlugin` with a default `Ok(None)` implementation so any index type can opt in. Implement it on `ZoneMapIndexPlugin` by reading `rows_per_zone` from the index file metadata — no full index load required. Replace the hardcoded `create_zone_map_seed_writers` in the write path with a generic `create_seed_writers` that iterates all single-field indices, resolves the plugin via `IndexDetails::get_plugin`, and delegates to `plugin.create_seed_writer`. Also move `ZONEMAP_SEED_META_KEY_PREFIX` to `seed.rs` as the shared `SEED_META_KEY_PREFIX`. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-index/src/scalar/registry.rs | 22 ++++++++++- rust/lance-index/src/scalar/seed.rs | 9 ++++- rust/lance-index/src/scalar/zonemap.rs | 34 +++++++++++++++-- rust/lance/src/dataset/write.rs | 49 ++++++++++--------------- rust/lance/src/index/append.rs | 8 ++-- 5 files changed, 83 insertions(+), 39 deletions(-) diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index 0add98d8ab3..91655df37b8 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -4,7 +4,7 @@ use std::borrow::Cow; use std::sync::Arc; -use arrow_schema::Field; +use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion::execution::SendableRecordBatchStream; use lance_core::{ @@ -217,6 +217,26 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { // Return an empty JSON object as the default implementation Ok(serde_json::json!({})) } + + /// Optionally create a seed writer for the given column. + /// + /// A seed writer observes column values during data file writes, accumulates + /// compact statistics in memory, and serializes them as a global buffer + /// embedded in the data file footer. The buffer is later harvested during + /// index updates to skip a full column scan. + /// + /// `index_store` gives access to the existing index files so the plugin + /// can read parameters it needs (e.g. zone size) without loading the full + /// index. Return `Ok(None)` if this index type does not support seed writing. + async fn create_seed_writer( + &self, + _field_path: &str, + _data_type: &DataType, + _index_store: Arc, + _index_details: &prost_types::Any, + ) -> Result>> { + Ok(None) + } } /// In-memory cache key for a whole `Arc`. diff --git a/rust/lance-index/src/scalar/seed.rs b/rust/lance-index/src/scalar/seed.rs index 29b64d651d1..5101418e859 100644 --- a/rust/lance-index/src/scalar/seed.rs +++ b/rust/lance-index/src/scalar/seed.rs @@ -14,6 +14,9 @@ use arrow_array::ArrayRef; use bytes::Bytes; use lance_core::Result; +/// Schema metadata key prefix for all seed buffers: `"lance.seed."`. +pub const SEED_META_KEY_PREFIX: &str = "lance.seed."; + /// A hook registered during data file writes that observes column values batch /// by batch, accumulates compact statistics in memory, and serializes them to /// a byte buffer that is embedded in the data file footer as a global buffer. @@ -36,8 +39,10 @@ pub trait IndexSeedWriter: Send + std::fmt::Debug { /// Convention: `"lance.seed."`. fn schema_metadata_key(&self) -> String; - /// Schema metadata value given the 1-based buffer index assigned by the writer. - /// Should encode all information needed to re-read the seed (e.g. rows_per_zone). + /// Create a string to store in the file's schema metadata. This will normally + /// contain the buffer index (provided by the caller after `add_global_buffer`) + /// as well as any other information needed to validate or understand the seed + /// (e.g. `rows_per_zone` for zone map seeds). fn schema_metadata_value(&self, buf_index: u32) -> String; } diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 8cdceab737b..fd24fff4855 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -1073,10 +1073,32 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { ) -> Result> { Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) } -} -/// Metadata key prefix for zone map seed buffers. -pub const ZONEMAP_SEED_META_KEY_PREFIX: &str = "lance.seed."; + async fn create_seed_writer( + &self, + field_path: &str, + data_type: &DataType, + index_store: Arc, + _index_details: &prost_types::Any, + ) -> Result>> { + if data_type.is_nested() { + return Ok(None); + } + let Ok(index_file) = index_store.open_index_file(ZONEMAP_FILENAME).await else { + return Ok(None); + }; + let rows_per_zone: u64 = index_file + .schema() + .metadata + .get(ZONEMAP_SIZE_META_KEY) + .and_then(|s| s.parse().ok()) + .unwrap_or(ROWS_PER_ZONE_DEFAULT); + match ZoneMapSeedWriter::new(field_path, rows_per_zone, data_type.clone()) { + Ok(writer) => Ok(Some(Box::new(writer))), + Err(_) => Ok(None), + } + } +} /// A seed writer that observes column values during data file writes and /// accumulates zone map statistics for later harvest during index updates. @@ -1307,7 +1329,11 @@ impl IndexSeedWriter for ZoneMapSeedWriter { } fn schema_metadata_key(&self) -> String { - format!("{}{}", ZONEMAP_SEED_META_KEY_PREFIX, self.column_name) + format!( + "{}{}", + crate::scalar::seed::SEED_META_KEY_PREFIX, + self.column_name + ) } fn schema_metadata_value(&self, buf_index: u32) -> String { diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ccc490b3a56..bd922ce6db0 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1111,7 +1111,7 @@ pub async fn write_fragments_internal( ))); } - let seed_writers = create_zone_map_seed_writers(dataset, ¶ms, storage_version).await?; + let seed_writers = create_seed_writers(dataset, ¶ms, storage_version).await?; let fragments = do_write_fragments( dataset, @@ -1129,14 +1129,15 @@ pub async fn write_fragments_internal( Ok((fragments, schema)) } -async fn create_zone_map_seed_writers( +async fn create_seed_writers( dataset: Option<&Dataset>, params: &WriteParams, storage_version: LanceFileVersion, ) -> Result>> { + use crate::dataset::index::LanceIndexStoreExt; use crate::index::DatasetIndexExt; - use lance_index::metrics::NoOpMetricsCollector; - use lance_index::scalar::zonemap::ZoneMapIndex; + use crate::index::scalar::{IndexDetails, fetch_index_details}; + use lance_index::scalar::lance_format::LanceIndexStore; use lance_table::format::IndexMetadata; // Seeds only make sense when appending to an existing dataset with V2 files. @@ -1161,38 +1162,28 @@ async fn create_zone_map_seed_writers( let Ok(field_path) = dataset.schema().field_path(field_id) else { continue; }; - - let Ok(scalar_index) = crate::index::scalar::open_scalar_index( - dataset, - &field_path, - index, - &NoOpMetricsCollector, - ) - .await - else { + let Some(data_type) = dataset.schema().field(&field_path).map(|f| f.data_type()) else { continue; }; - let Some(zone_map) = scalar_index.as_any().downcast_ref::() else { + let Ok(index_details) = fetch_index_details(dataset, &field_path, index).await else { continue; }; - - let rows_per_zone = zone_map.rows_per_zone(); - let Some(data_type) = dataset.schema().field(&field_path).map(|f| f.data_type()) else { + let details = IndexDetails(index_details.clone()); + let Ok(plugin) = details.get_plugin() else { continue; }; - - if data_type.is_nested() { + let Ok(index_store) = LanceIndexStore::from_dataset_for_existing(dataset, index).await + else { continue; - } + }; + let index_store: Arc = Arc::new(index_store); - match lance_index::scalar::zonemap::ZoneMapSeedWriter::new( - field_path.clone(), - rows_per_zone, - data_type, - ) { - Ok(seed_writer) => writers.push(Box::new(seed_writer)), - Err(_) => continue, + if let Ok(Some(writer)) = plugin + .create_seed_writer(&field_path, &data_type, index_store, &index_details) + .await + { + writers.push(writer); } } @@ -3717,7 +3708,7 @@ mod tests { use lance_datagen::{array, gen_batch}; use lance_file::reader::FileReaderOptions; use lance_index::metrics::NoOpMetricsCollector; - use lance_index::scalar::zonemap::ZONEMAP_SEED_META_KEY_PREFIX; + use lance_index::scalar::seed::SEED_META_KEY_PREFIX; use lance_index::{IndexType, scalar::ScalarIndexParams}; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; use lance_io::utils::CachedFileSize; @@ -3781,7 +3772,7 @@ mod tests { .await .unwrap(); - let meta_key = format!("{}val", ZONEMAP_SEED_META_KEY_PREFIX); + let meta_key = format!("{}val", SEED_META_KEY_PREFIX); let has_seed = reader .metadata() .file_schema diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index e107fd49d67..172bc981999 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -12,8 +12,10 @@ use lance_index::{ optimize::OptimizeOptions, progress::NoopIndexBuildProgress, scalar::{ - CreatedIndex, OldIndexDataFilter, ScalarIndex, inverted::InvertedIndex, - lance_format::LanceIndexStore, seed::FragmentSeed, zonemap::ZONEMAP_SEED_META_KEY_PREFIX, + CreatedIndex, OldIndexDataFilter, ScalarIndex, + inverted::InvertedIndex, + lance_format::LanceIndexStore, + seed::{FragmentSeed, SEED_META_KEY_PREFIX}, }, }; use lance_io::scheduler::{ScanScheduler, SchedulerConfig}; @@ -193,7 +195,7 @@ async fn try_harvest_zonemap_seeds( return Ok(Some(Vec::new())); } - let meta_key = format!("{}{}", ZONEMAP_SEED_META_KEY_PREFIX, column_name); + let meta_key = format!("{}{}", SEED_META_KEY_PREFIX, column_name); let mut seeds = Vec::with_capacity(fragments.len()); let scheduler = ScanScheduler::new( From 790ea89581f4b2aa131c9b5a4fad4ffe24dbf39d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 24 Jun 2026 16:03:36 +0000 Subject: [PATCH 04/10] refactor(index): store rows_per_zone in ZoneMapIndexDetails; remove I/O from create_seed_writer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `optional uint64 rows_per_zone` to the `ZoneMapIndexDetails` proto and populate it at all four CreatedIndex emission sites. This lets `ZoneMapIndexPlugin::create_seed_writer` read rows_per_zone directly from the decoded proto with no file I/O. Old datasets that predate the field fall back to ROWS_PER_ZONE_DEFAULT via the optional/None path. Also remove the `index_store` parameter from `ScalarIndexPlugin::create_seed_writer` and document that the method must not perform I/O — all needed parameters must come from `index_details`. Co-Authored-By: Claude Sonnet 4.6 --- protos/index_old.proto | 7 ++++- rust/lance-index/src/scalar/registry.rs | 7 +++-- rust/lance-index/src/scalar/zonemap.rs | 34 ++++++++++++------------- rust/lance/src/dataset/write.rs | 10 +------- 4 files changed, 27 insertions(+), 31 deletions(-) diff --git a/protos/index_old.proto b/protos/index_old.proto index 601aa2681da..ce78601855b 100644 --- a/protos/index_old.proto +++ b/protos/index_old.proto @@ -24,7 +24,12 @@ message BTreeIndexDetails {} message BitmapIndexDetails {} message LabelListIndexDetails {} message NGramIndexDetails {} -message ZoneMapIndexDetails {} +message ZoneMapIndexDetails { + // Number of rows per zone. Optional for backwards compatibility: absent on + // datasets written before this field was added; callers should fall back to + // the index-default when not present. + optional uint64 rows_per_zone = 1; +} message InvertedIndexDetails { // Marking this field as optional as old versions of the index store blank details and we // need to make sure we have a proper optional field to detect this. diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index 91655df37b8..0032d10f2d0 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -225,14 +225,13 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { /// embedded in the data file footer. The buffer is later harvested during /// index updates to skip a full column scan. /// - /// `index_store` gives access to the existing index files so the plugin - /// can read parameters it needs (e.g. zone size) without loading the full - /// index. Return `Ok(None)` if this index type does not support seed writing. + /// All parameters needed to construct the writer must be derivable from + /// `index_details` — this method must not perform any I/O. Return `Ok(None)` + /// if this index type does not support seed writing. async fn create_seed_writer( &self, _field_path: &str, _data_type: &DataType, - _index_store: Arc, _index_details: &prost_types::Any, ) -> Result>> { Ok(None) diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index fd24fff4855..24dda925e45 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -632,8 +632,7 @@ impl ScalarIndex for ZoneMapIndex { let file = builder.write_index(dest_store).await?; Ok(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) - .unwrap(), + index_details: make_zone_map_index_details(self.rows_per_zone), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], }) @@ -674,8 +673,7 @@ impl ScalarIndex for ZoneMapIndex { let file = builder.write_index(dest_store).await?; Ok(Some(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) - .unwrap(), + index_details: make_zone_map_index_details(self.rows_per_zone), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], })) @@ -727,7 +725,7 @@ pub async fn merge_zonemap_indices( builder.write_index(dest_store).await?; Ok(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()).unwrap(), + index_details: make_zone_map_index_details(rows_per_zone), index_version: ZONEMAP_INDEX_VERSION, files: dest_store.list_files_with_sizes().await?, }) @@ -955,6 +953,13 @@ impl ZoneProcessor for ZoneMapProcessor { } } +fn make_zone_map_index_details(rows_per_zone: u64) -> prost_types::Any { + prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails { + rows_per_zone: Some(rows_per_zone), + }) + .unwrap() +} + #[derive(Debug, Default)] pub struct ZoneMapIndexPlugin; @@ -1055,10 +1060,10 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { "must provide training request created by new_training_request".into(), ) })?; + let rows_per_zone = request.params.rows_per_zone; let file = Self::train_zonemap_index(data, index_store, Some(request.params)).await?; Ok(CreatedIndex { - index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) - .unwrap(), + index_details: make_zone_map_index_details(rows_per_zone), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], }) @@ -1078,20 +1083,15 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { &self, field_path: &str, data_type: &DataType, - index_store: Arc, - _index_details: &prost_types::Any, + index_details: &prost_types::Any, ) -> Result>> { if data_type.is_nested() { return Ok(None); } - let Ok(index_file) = index_store.open_index_file(ZONEMAP_FILENAME).await else { - return Ok(None); - }; - let rows_per_zone: u64 = index_file - .schema() - .metadata - .get(ZONEMAP_SIZE_META_KEY) - .and_then(|s| s.parse().ok()) + let rows_per_zone = index_details + .to_msg::() + .ok() + .and_then(|d| d.rows_per_zone) .unwrap_or(ROWS_PER_ZONE_DEFAULT); match ZoneMapSeedWriter::new(field_path, rows_per_zone, data_type.clone()) { Ok(writer) => Ok(Some(Box::new(writer))), diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index bd922ce6db0..98b3d853a24 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1134,10 +1134,8 @@ async fn create_seed_writers( params: &WriteParams, storage_version: LanceFileVersion, ) -> Result>> { - use crate::dataset::index::LanceIndexStoreExt; use crate::index::DatasetIndexExt; use crate::index::scalar::{IndexDetails, fetch_index_details}; - use lance_index::scalar::lance_format::LanceIndexStore; use lance_table::format::IndexMetadata; // Seeds only make sense when appending to an existing dataset with V2 files. @@ -1173,14 +1171,8 @@ async fn create_seed_writers( let Ok(plugin) = details.get_plugin() else { continue; }; - let Ok(index_store) = LanceIndexStore::from_dataset_for_existing(dataset, index).await - else { - continue; - }; - let index_store: Arc = Arc::new(index_store); - if let Ok(Some(writer)) = plugin - .create_seed_writer(&field_path, &data_type, index_store, &index_details) + .create_seed_writer(&field_path, &data_type, &index_details) .await { writers.push(writer); From 1c092106867f27fe18aab21bb0a7a8249ed94b45 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 24 Jun 2026 22:26:12 +0000 Subject: [PATCH 05/10] fix(index): skip seed writer when rows_per_zone absent from ZoneMapIndexDetails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Old datasets that predate the rows_per_zone proto field should not fall back to a default value — the seed buffer would silently use the wrong zone size. Return Ok(None) instead so no seed writer is created. Co-Authored-By: Claude Sonnet 4.6 --- protos/index_old.proto | 4 ++-- rust/lance-index/src/scalar/zonemap.rs | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/protos/index_old.proto b/protos/index_old.proto index ce78601855b..3339e1b7d0b 100644 --- a/protos/index_old.proto +++ b/protos/index_old.proto @@ -26,8 +26,8 @@ message LabelListIndexDetails {} message NGramIndexDetails {} message ZoneMapIndexDetails { // Number of rows per zone. Optional for backwards compatibility: absent on - // datasets written before this field was added; callers should fall back to - // the index-default when not present. + // datasets written before this field was added. When absent, no seed writer + // is created for the index. optional uint64 rows_per_zone = 1; } message InvertedIndexDetails { diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 24dda925e45..08612970efe 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -1088,11 +1088,13 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { if data_type.is_nested() { return Ok(None); } - let rows_per_zone = index_details + let Some(rows_per_zone) = index_details .to_msg::() .ok() .and_then(|d| d.rows_per_zone) - .unwrap_or(ROWS_PER_ZONE_DEFAULT); + else { + return Ok(None); + }; match ZoneMapSeedWriter::new(field_path, rows_per_zone, data_type.clone()) { Ok(writer) => Ok(Some(Box::new(writer))), Err(_) => Ok(None), From 46311f21dcb1faa9df1b492c2d2a96a78cae6637 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Jun 2026 13:00:29 +0000 Subject: [PATCH 06/10] fix(index): propagate errors from create_seed_writer instead of silently ignoring them Co-Authored-By: Claude Sonnet 4.6 --- rust/lance/src/dataset/write.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 98b3d853a24..01654d1d0e9 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -1171,9 +1171,9 @@ async fn create_seed_writers( let Ok(plugin) = details.get_plugin() else { continue; }; - if let Ok(Some(writer)) = plugin + if let Some(writer) = plugin .create_seed_writer(&field_path, &data_type, &index_details) - .await + .await? { writers.push(writer); } From eb576fbbe79e92619ecfb1f1d2b10da9f6b8ac9e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Jun 2026 13:23:05 +0000 Subject: [PATCH 07/10] refactor(index): make seed-based index update a generic ScalarIndexPlugin capability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `update_from_seeds` to `ScalarIndexPlugin` (default `Ok(None)`) so any index type can opt into seed-based incremental updates without coupling the update path to a specific index type. Add `metadata_value` to `FragmentSeed` so plugins can validate seed compatibility (e.g. confirming `rows_per_zone`). Implement `ZoneMapIndexPlugin::update_from_seeds` with `rows_per_zone` validation and rename `try_harvest_zonemap_seeds` → `try_harvest_seeds` in `append.rs`. Remove the hardcoded `IndexType::ZoneMap` branch; the merge path now tries seeds generically before falling back to a full column scan. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance-index/src/scalar.rs | 11 -- rust/lance-index/src/scalar/registry.rs | 20 ++++ rust/lance-index/src/scalar/seed.rs | 5 + rust/lance-index/src/scalar/zonemap.rs | 34 +++++++ rust/lance/src/index/append.rs | 128 ++++++++++-------------- 5 files changed, 111 insertions(+), 87 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 99e51741a71..9c752aff4db 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -1133,17 +1133,6 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf { /// This returns a ScalarIndexParams that can be used to recreate an index /// with the same configuration on another dataset. fn derive_index_params(&self) -> Result; - - /// Try to update the index using pre-harvested seed buffers instead of - /// re-scanning column data. Returns `None` if seeds are not supported or - /// the provided seeds cannot be used (e.g., wrong parameters). - async fn try_update_with_seeds( - &self, - _seeds: &[crate::scalar::seed::FragmentSeed], - _dest_store: &dyn IndexStore, - ) -> Result> { - Ok(None) - } } #[cfg(test)] diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index 0032d10f2d0..d26fd6240df 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -236,6 +236,26 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { ) -> Result>> { Ok(None) } + + /// Attempt to update `reference_index` using pre-harvested `seeds` instead + /// of re-scanning column data. + /// + /// Each [`FragmentSeed`](super::seed::FragmentSeed) carries the raw bytes + /// written by the corresponding [`IndexSeedWriter`](super::seed::IndexSeedWriter) + /// and the original `metadata_value` stored in the data file, which the plugin + /// can use for compatibility validation (e.g. confirming `rows_per_zone`). + /// + /// Return `Ok(Some(created))` if the seed-based update succeeded, or + /// `Ok(None)` to signal that the caller should fall back to a full column scan. + async fn update_from_seeds( + &self, + _seeds: Vec, + _reference_index: Arc, + _index_details: &prost_types::Any, + _dest_store: &dyn IndexStore, + ) -> Result> { + Ok(None) + } } /// In-memory cache key for a whole `Arc`. diff --git a/rust/lance-index/src/scalar/seed.rs b/rust/lance-index/src/scalar/seed.rs index 5101418e859..eaa5766fafa 100644 --- a/rust/lance-index/src/scalar/seed.rs +++ b/rust/lance-index/src/scalar/seed.rs @@ -51,4 +51,9 @@ pub trait IndexSeedWriter: Send + std::fmt::Debug { pub struct FragmentSeed { pub fragment_id: u64, pub bytes: Bytes, + /// The raw value that was stored in the data file's schema metadata under + /// the seed key (i.e. the output of [`IndexSeedWriter::schema_metadata_value`]). + /// Plugins can inspect this to validate that the seed is compatible with the + /// current index configuration before consuming `bytes`. + pub metadata_value: String, } diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 08612970efe..20a2b2e3f9d 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -648,7 +648,9 @@ impl ScalarIndex for ZoneMapIndex { let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) } +} +impl ZoneMapIndex { async fn try_update_with_seeds( &self, seeds: &[crate::scalar::seed::FragmentSeed], @@ -1100,6 +1102,38 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { Err(_) => Ok(None), } } + + async fn update_from_seeds( + &self, + seeds: Vec, + reference_index: Arc, + index_details: &prost_types::Any, + dest_store: &dyn IndexStore, + ) -> Result> { + let Some(rows_per_zone) = index_details + .to_msg::() + .ok() + .and_then(|d| d.rows_per_zone) + else { + return Ok(None); + }; + + // Validate each seed was written with the same rows_per_zone. + for seed in &seeds { + let rpz_in_seed = seed + .metadata_value + .split_once(':') + .and_then(|(_, rpz)| rpz.parse::().ok()); + if rpz_in_seed != Some(rows_per_zone) { + return Ok(None); + } + } + + let Some(zone_map) = reference_index.as_any().downcast_ref::() else { + return Ok(None); + }; + zone_map.try_update_with_seeds(&seeds, dest_store).await + } } /// A seed writer that observes column values during data file writes and diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 172bc981999..0cba25b6f94 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -31,7 +31,7 @@ use super::vector::ivf::{optimize_vector_indices, select_segment_for_single_reba use crate::dataset::Dataset; use crate::dataset::index::LanceIndexStoreExt; use crate::dataset::rowids::load_row_id_sequences; -use crate::index::scalar::load_training_data; +use crate::index::scalar::{IndexDetails, fetch_index_details, load_training_data}; use crate::index::vector_index_details_default; #[derive(Debug, Clone)] @@ -180,16 +180,16 @@ pub async fn build_per_segment_filters( Ok((effective_union, filters)) } -/// Attempt to read zone map seeds for `fragments` from their data files. +/// Attempt to read seed buffers for `column_name` from `fragments`' data files. /// -/// Returns `Some(vec)` if ALL fragments have a valid seed for `column_name` -/// with matching `rows_per_zone`; returns `None` if any fragment is missing a seed -/// or the seed parameters don't match. -async fn try_harvest_zonemap_seeds( +/// Returns `Some(vec)` only if every fragment has a seed entry; returns `None` +/// if any fragment is missing a seed or its data file cannot be opened. +/// Index-type-specific validation (e.g. `rows_per_zone` checks) is left to the +/// caller via [`FragmentSeed::metadata_value`]. +async fn try_harvest_seeds( dataset: &Dataset, fragments: &[Fragment], column_name: &str, - rows_per_zone: u64, ) -> Result>> { if fragments.is_empty() { return Ok(Some(Vec::new())); @@ -204,7 +204,6 @@ async fn try_harvest_zonemap_seeds( ); for fragment in fragments { - // Find the primary data file (first file) let Some(data_file) = fragment.files.first() else { return Ok(None); }; @@ -231,7 +230,6 @@ async fn try_harvest_zonemap_seeds( return Ok(None); }; - // Look for the seed metadata key let Some(meta_value) = reader .metadata() .file_schema @@ -242,23 +240,11 @@ async fn try_harvest_zonemap_seeds( return Ok(None); }; - // Parse "{buf_index}:{rows_per_zone_from_file}" - let parts: Vec<&str> = meta_value.splitn(2, ':').collect(); - if parts.len() != 2 { - return Ok(None); - } - let Ok(buf_index) = parts[0].parse::() else { + // The buf_index is always the portion before the first ':'. + let Ok(buf_index) = meta_value.split(':').next().unwrap().parse::() else { return Ok(None); }; - let Ok(rows_per_zone_from_file) = parts[1].parse::() else { - return Ok(None); - }; - - if rows_per_zone_from_file != rows_per_zone { - return Ok(None); - } - // Read the global buffer (the index returned by add_global_buffer is used directly) let Ok(bytes) = reader.read_global_buffer(buf_index).await else { return Ok(None); }; @@ -266,6 +252,7 @@ async fn try_harvest_zonemap_seeds( seeds.push(FragmentSeed { fragment_id: fragment.id, bytes, + metadata_value: meta_value, }); } @@ -453,49 +440,52 @@ async fn merge_scalar_indices<'a>( ) .await? } else { - let new_data_stream = - load_unindexed_training_data(dataset.as_ref(), field_path, &update_criteria, unindexed) - .await?; let new_store = LanceIndexStore::from_dataset_for_new(&dataset, &new_uuid)?; - match index_type { - IndexType::BTree => { - let (_, old_data_filters) = - build_per_segment_filters(dataset.as_ref(), &selected_old_indices).await?; - crate::index::scalar::btree::open_and_merge_segments( - dataset.as_ref(), - field_path, - &selected_old_indices, - new_data_stream, - &new_store, - &old_data_filters, - ) + // Try a seed-based update before falling back to a full column scan. + // Seeds are only available if every unindexed fragment had a seed buffer + // written during its data file write, and the plugin validates that the + // seeds are compatible with the current index configuration. + let index_details = + fetch_index_details(dataset.as_ref(), field_path, reference_idx).await?; + let details = IndexDetails(index_details.clone()); + let plugin = details.get_plugin()?; + let maybe_created = if let Some(seeds) = + try_harvest_seeds(dataset.as_ref(), unindexed, column_name).await? + { + plugin + .update_from_seeds(seeds, reference_index.clone(), &index_details, &new_store) .await? - } - IndexType::ZoneMap => { - // Try to accelerate the update using pre-harvested seed buffers. - let zone_map = reference_index - .as_any() - .downcast_ref::(); - let maybe_created = if let Some(zone_map) = zone_map { - let rpz = zone_map.rows_per_zone(); - match try_harvest_zonemap_seeds(dataset.as_ref(), unindexed, column_name, rpz) - .await? - { - Some(seeds) => { - reference_index - .try_update_with_seeds(&seeds, &new_store) - .await? - } - None => None, - } - } else { - None - }; + } else { + None + }; - if let Some(created) = maybe_created { - created - } else { + if let Some(created) = maybe_created { + created + } else { + let new_data_stream = load_unindexed_training_data( + dataset.as_ref(), + field_path, + &update_criteria, + unindexed, + ) + .await?; + + match index_type { + IndexType::BTree => { + let (_, old_data_filters) = + build_per_segment_filters(dataset.as_ref(), selected_old_indices).await?; + crate::index::scalar::btree::open_and_merge_segments( + dataset.as_ref(), + field_path, + selected_old_indices, + new_data_stream, + &new_store, + &old_data_filters, + ) + .await? + } + _ => { let old_data_filter = build_old_data_filter( dataset.as_ref(), &effective_old_frags, @@ -507,20 +497,6 @@ async fn merge_scalar_indices<'a>( .await? } } - // NOTE: IndexType::Inverted never reaches here -- it is handled by the - // dedicated arm in merge_indices_with_unindexed_frags before this - // function is called. - _ => { - let old_data_filter = build_old_data_filter( - dataset.as_ref(), - &effective_old_frags, - &deleted_old_frags, - ) - .await?; - reference_index - .update(new_data_stream, &new_store, old_data_filter) - .await? - } } }; From a1f6a993a3244125a2450f8f2d5f54d10072ac24 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Jun 2026 13:38:39 +0000 Subject: [PATCH 08/10] feat(index): add use_seeds parameter to zone map index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `use_seeds: bool` to `ZoneMapIndexDetails` proto and propagate it through the zone map plugin stack. The field defaults to `true` for variable-length (string, binary) and wide primitive types (≥ 8 bytes) where skipping a column scan has the most impact, and `false` for narrow fixed-width types (Int32, Float32, …) that are fast enough to scan directly. `ScalarIndexPlugin` gains a `might_use_seeds` hook (default `false`) that lets the update path in `append.rs` skip opening data files entirely when the index configuration is known to never write seeds. `ZoneMapIndexPlugin` implements the hook by reading `use_seeds` from the proto details. `create_seed_writer` now also respects `use_seeds`, so no seed buffer is embedded in the data file for indexes where seeds are disabled. Co-Authored-By: Claude Sonnet 4.6 --- protos/index_old.proto | 7 ++ rust/lance-index/src/scalar/registry.rs | 10 ++ rust/lance-index/src/scalar/zonemap.rs | 144 ++++++++++++++++++------ rust/lance/src/index/append.rs | 17 ++- 4 files changed, 139 insertions(+), 39 deletions(-) diff --git a/protos/index_old.proto b/protos/index_old.proto index 3339e1b7d0b..a3b542b9590 100644 --- a/protos/index_old.proto +++ b/protos/index_old.proto @@ -29,6 +29,13 @@ message ZoneMapIndexDetails { // datasets written before this field was added. When absent, no seed writer // is created for the index. optional uint64 rows_per_zone = 1; + // Whether seed-based incremental updates are enabled for this index. + // Absent (old datasets) or false: seeds disabled. + // true: seeds enabled; the index will embed per-fragment seed buffers in + // data files and harvest them during incremental updates to skip full scans. + // Defaults to true for variable-length and wide (≥ 8-byte) column types + // when not explicitly set by the user. + optional bool use_seeds = 2; } message InvertedIndexDetails { // Marking this field as optional as old versions of the index store blank details and we diff --git a/rust/lance-index/src/scalar/registry.rs b/rust/lance-index/src/scalar/registry.rs index d26fd6240df..4a883da3944 100644 --- a/rust/lance-index/src/scalar/registry.rs +++ b/rust/lance-index/src/scalar/registry.rs @@ -237,6 +237,16 @@ pub trait ScalarIndexPlugin: Send + Sync + std::fmt::Debug { Ok(None) } + /// Returns true if this index type may have seed buffers embedded in data + /// files for the given index configuration. + /// + /// When false the caller can skip opening data files to look for seeds + /// entirely, avoiding I/O for index types or configurations that never + /// write seeds. + fn might_use_seeds(&self, _index_details: &prost_types::Any) -> bool { + false + } + /// Attempt to update `reference_index` using pre-harvested `seeds` instead /// of re-scanning column data. /// diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 20a2b2e3f9d..81585abdec9 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -107,6 +107,7 @@ pub struct ZoneMapIndex { data_type: DataType, // The maximum rows per zone provided by user rows_per_zone: u64, + use_seeds: bool, store: Arc, fri: Option>, index_cache: WeakLanceCache, @@ -118,6 +119,7 @@ impl std::fmt::Debug for ZoneMapIndex { .field("zones", &self.zones) .field("data_type", &self.data_type) .field("rows_per_zone", &self.rows_per_zone) + .field("use_seeds", &self.use_seeds) .field("store", &self.store) .field("fri", &self.fri) .field("index_cache", &self.index_cache) @@ -417,6 +419,7 @@ impl ZoneMapIndex { store: Arc, fri: Option>, index_cache: &LanceCache, + use_seeds: bool, ) -> Result> where Self: Sized, @@ -438,6 +441,7 @@ impl ZoneMapIndex { fri, index_cache, rows_per_zone, + use_seeds, )?)) } @@ -447,6 +451,7 @@ impl ZoneMapIndex { fri: Option>, index_cache: &LanceCache, rows_per_zone: u64, + use_seeds: bool, ) -> Result { // The RecordBatch should have columns: min, max, null_count let min_col = data @@ -505,6 +510,7 @@ impl ZoneMapIndex { zones: Vec::new(), data_type, rows_per_zone, + use_seeds, store, fri, index_cache: WeakLanceCache::from(index_cache), @@ -536,6 +542,7 @@ impl ZoneMapIndex { zones, data_type, rows_per_zone, + use_seeds, store, fri, index_cache: WeakLanceCache::from(index_cache), @@ -632,7 +639,7 @@ impl ScalarIndex for ZoneMapIndex { let file = builder.write_index(dest_store).await?; Ok(CreatedIndex { - index_details: make_zone_map_index_details(self.rows_per_zone), + index_details: make_zone_map_index_details(self.rows_per_zone, self.use_seeds), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], }) @@ -645,7 +652,10 @@ impl ScalarIndex for ZoneMapIndex { } fn derive_index_params(&self) -> Result { - let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?; + let params = serde_json::to_value(ZoneMapIndexBuilderParams { + rows_per_zone: self.rows_per_zone, + use_seeds: Some(self.use_seeds), + })?; Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms)) } } @@ -675,7 +685,7 @@ impl ZoneMapIndex { let file = builder.write_index(dest_store).await?; Ok(Some(CreatedIndex { - index_details: make_zone_map_index_details(self.rows_per_zone), + index_details: make_zone_map_index_details(self.rows_per_zone, self.use_seeds), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], })) @@ -692,6 +702,7 @@ pub async fn merge_zonemap_indices( Error::invalid_input("merge_zonemap_indices requires at least one source index") })?; let rows_per_zone = first.rows_per_zone; + let use_seeds = first.use_seeds; let data_type = first.data_type.clone(); let mut zones = Vec::new(); @@ -727,7 +738,7 @@ pub async fn merge_zonemap_indices( builder.write_index(dest_store).await?; Ok(CreatedIndex { - index_details: make_zone_map_index_details(rows_per_zone), + index_details: make_zone_map_index_details(rows_per_zone, use_seeds), index_version: ZONEMAP_INDEX_VERSION, files: dest_store.list_files_with_sizes().await?, }) @@ -741,6 +752,12 @@ fn default_rows_per_zone() -> u64 { pub struct ZoneMapIndexBuilderParams { #[serde(default = "default_rows_per_zone")] rows_per_zone: u64, + /// Whether to embed per-fragment seed buffers in data files for use during + /// incremental index updates. `None` means auto-detect based on column type + /// (see [`default_use_seeds`]). Resolved to a concrete `bool` during + /// training in [`ZoneMapIndexPlugin::new_training_request`]. + #[serde(default)] + use_seeds: Option, } static DEFAULT_ROWS_PER_ZONE: LazyLock = LazyLock::new(|| { @@ -754,13 +771,17 @@ impl Default for ZoneMapIndexBuilderParams { fn default() -> Self { Self { rows_per_zone: *DEFAULT_ROWS_PER_ZONE, + use_seeds: None, } } } impl ZoneMapIndexBuilderParams { pub fn new(rows_per_zone: u64) -> Self { - Self { rows_per_zone } + Self { + rows_per_zone, + use_seeds: None, + } } pub fn rows_per_zone(&self) -> u64 { @@ -955,13 +976,42 @@ impl ZoneProcessor for ZoneMapProcessor { } } -fn make_zone_map_index_details(rows_per_zone: u64) -> prost_types::Any { +fn make_zone_map_index_details(rows_per_zone: u64, use_seeds: bool) -> prost_types::Any { prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails { rows_per_zone: Some(rows_per_zone), + use_seeds: Some(use_seeds), }) .unwrap() } +/// Returns true when seed-based incremental updates should be enabled by +/// default for the given column type. +/// +/// Seeds are most valuable for variable-length types (strings, binary) and wide +/// primitive types (≥ 8 bytes) where scanning the raw column is comparatively +/// expensive. Narrow fixed-width types (e.g. Int32, Float32) scan fast enough +/// that the overhead of managing seeds is unlikely to pay off. +fn default_use_seeds(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + | DataType::BinaryView + | DataType::Int64 + | DataType::UInt64 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + | DataType::Timestamp(_, _) + | DataType::Date64 + | DataType::Time64(_) + | DataType::Duration(_) + ) +} + #[derive(Debug, Default)] pub struct ZoneMapIndexPlugin; @@ -1022,7 +1072,11 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { )); } - let params = serde_json::from_str::(params)?; + let mut params = serde_json::from_str::(params)?; + // Resolve None → type-based default so train_index always sees Some(bool). + if params.use_seeds.is_none() { + params.use_seeds = Some(default_use_seeds(field.data_type())); + } Ok(Box::new(ZoneMapIndexTrainingRequest::new(params))) } @@ -1063,9 +1117,10 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { ) })?; let rows_per_zone = request.params.rows_per_zone; + let use_seeds = request.params.use_seeds.unwrap_or(false); let file = Self::train_zonemap_index(data, index_store, Some(request.params)).await?; Ok(CreatedIndex { - index_details: make_zone_map_index_details(rows_per_zone), + index_details: make_zone_map_index_details(rows_per_zone, use_seeds), index_version: ZONEMAP_INDEX_VERSION, files: vec![file], }) @@ -1074,11 +1129,27 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { async fn load_index( &self, index_store: Arc, - _index_details: &prost_types::Any, + index_details: &prost_types::Any, frag_reuse_index: Option>, cache: &LanceCache, ) -> Result> { - Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc) + let use_seeds = index_details + .to_msg::() + .ok() + .and_then(|d| d.use_seeds) + .unwrap_or(false); + Ok( + ZoneMapIndex::load(index_store, frag_reuse_index, cache, use_seeds).await? + as Arc, + ) + } + + fn might_use_seeds(&self, index_details: &prost_types::Any) -> bool { + index_details + .to_msg::() + .ok() + .and_then(|d| d.use_seeds) + .unwrap_or(false) } async fn create_seed_writer( @@ -1090,13 +1161,13 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { if data_type.is_nested() { return Ok(None); } - let Some(rows_per_zone) = index_details - .to_msg::() - .ok() - .and_then(|d| d.rows_per_zone) - else { + let details = index_details.to_msg::().ok(); + let Some(rows_per_zone) = details.as_ref().and_then(|d| d.rows_per_zone) else { return Ok(None); }; + if !details.as_ref().and_then(|d| d.use_seeds).unwrap_or(false) { + return Ok(None); + } match ZoneMapSeedWriter::new(field_path, rows_per_zone, data_type.clone()) { Ok(writer) => Ok(Some(Box::new(writer))), Err(_) => Ok(None), @@ -1468,7 +1539,7 @@ mod tests { log::debug!("Successfully wrote the index file"); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) .await .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 0); @@ -1512,7 +1583,7 @@ mod tests { log::debug!("Successfully wrote the index file"); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) .await .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 10); @@ -1564,9 +1635,10 @@ mod tests { .unwrap(); // Verify the updated index has more zones - let updated_index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) - .await - .expect("Failed to load updated ZoneMapIndex"); + let updated_index = + ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) + .await + .expect("Failed to load updated ZoneMapIndex"); // Should have original 10 zones + 1 new zone (5000 rows with zone size 5000) assert_eq!(updated_index.zones.len(), 11); @@ -1635,7 +1707,7 @@ mod tests { .unwrap(); let cache = LanceCache::with_capacity(1024 * 1024); - let index = ZoneMapIndex::load(store.clone(), None, &cache) + let index = ZoneMapIndex::load(store.clone(), None, &cache, false) .await .unwrap(); @@ -1738,7 +1810,7 @@ mod tests { .unwrap(); // Load the index - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) .await .expect("Failed to load ZoneMapIndex"); @@ -1973,7 +2045,7 @@ mod tests { assert_eq!(metadata.get(ZONEMAP_SIZE_META_KEY).unwrap(), "100"); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) .await .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 2); @@ -2136,7 +2208,7 @@ mod tests { log::debug!("Successfully wrote the index file"); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) + let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) .await .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 3); @@ -2296,9 +2368,10 @@ mod tests { .unwrap(); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) - .await - .expect("Failed to load ZoneMapIndex"); + let index = + ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) + .await + .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 5); assert_eq!( index.zones, @@ -2504,9 +2577,10 @@ mod tests { .unwrap(); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) - .await - .expect("Failed to load ZoneMapIndex"); + let index = + ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) + .await + .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 3); assert_eq!( index.zones, @@ -2579,9 +2653,10 @@ mod tests { .unwrap(); // Read the index file back and check its contents - let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache()) - .await - .expect("Failed to load ZoneMapIndex"); + let index = + ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache(), false) + .await + .expect("Failed to load ZoneMapIndex"); assert_eq!(index.zones.len(), 3); assert_eq!( index.zones, @@ -2775,6 +2850,7 @@ mod tests { zones, data_type: DataType::Utf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + use_seeds: false, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), @@ -2847,6 +2923,7 @@ mod tests { zones, data_type: DataType::Utf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + use_seeds: false, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), @@ -2914,6 +2991,7 @@ mod tests { zones, data_type: DataType::LargeUtf8, rows_per_zone: ROWS_PER_ZONE_DEFAULT, + use_seeds: false, store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index 0cba25b6f94..b7a3e9de097 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -450,12 +450,17 @@ async fn merge_scalar_indices<'a>( fetch_index_details(dataset.as_ref(), field_path, reference_idx).await?; let details = IndexDetails(index_details.clone()); let plugin = details.get_plugin()?; - let maybe_created = if let Some(seeds) = - try_harvest_seeds(dataset.as_ref(), unindexed, column_name).await? - { - plugin - .update_from_seeds(seeds, reference_index.clone(), &index_details, &new_store) - .await? + // Only open data files looking for seeds when the plugin confirms this + // index type and configuration can actually produce them. + let maybe_created = if plugin.might_use_seeds(&index_details) { + if let Some(seeds) = try_harvest_seeds(dataset.as_ref(), unindexed, column_name).await? + { + plugin + .update_from_seeds(seeds, reference_index.clone(), &index_details, &new_store) + .await? + } else { + None + } } else { None }; From 7c60154e7ffbe83b1a7acc7d6acb894235d750f8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Jun 2026 13:52:44 +0000 Subject: [PATCH 09/10] fix(index): raise use_seeds threshold to > 8 bytes for fixed-width types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 8-byte primitives (Int64, Float64, Timestamp, …) scan fast enough that seed overhead is not worthwhile. `default_use_seeds` now returns true only for variable-length types (strings, binary) and fixed-width types strictly wider than 8 bytes (Decimal128, Decimal256, FixedSizeBinary(n > 8)). Co-Authored-By: Claude Sonnet 4.6 --- protos/index_old.proto | 4 +-- rust/lance-index/src/scalar/zonemap.rs | 37 +++++++++++--------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/protos/index_old.proto b/protos/index_old.proto index a3b542b9590..a075da1f024 100644 --- a/protos/index_old.proto +++ b/protos/index_old.proto @@ -33,8 +33,8 @@ message ZoneMapIndexDetails { // Absent (old datasets) or false: seeds disabled. // true: seeds enabled; the index will embed per-fragment seed buffers in // data files and harvest them during incremental updates to skip full scans. - // Defaults to true for variable-length and wide (≥ 8-byte) column types - // when not explicitly set by the user. + // Defaults to true for variable-length column types (strings, binary) and + // fixed-width types wider than 8 bytes when not explicitly set by the user. optional bool use_seeds = 2; } message InvertedIndexDetails { diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 81585abdec9..3a3909714b8 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -987,29 +987,24 @@ fn make_zone_map_index_details(rows_per_zone: u64, use_seeds: bool) -> prost_typ /// Returns true when seed-based incremental updates should be enabled by /// default for the given column type. /// -/// Seeds are most valuable for variable-length types (strings, binary) and wide -/// primitive types (≥ 8 bytes) where scanning the raw column is comparatively -/// expensive. Narrow fixed-width types (e.g. Int32, Float32) scan fast enough -/// that the overhead of managing seeds is unlikely to pay off. +/// Seeds pay off for variable-length types (strings, binary) — which can be +/// arbitrarily wide — and fixed-width types wider than 8 bytes (e.g. +/// Decimal128, FixedSizeBinary tensors). Fixed-width types ≤ 8 bytes (Int64, +/// Float64, …) scan fast enough that the seed overhead is not worth it. fn default_use_seeds(data_type: &DataType) -> bool { - matches!( - data_type, + match data_type { + // Variable-length: width is unbounded, skipping scans is always valuable. DataType::Utf8 - | DataType::LargeUtf8 - | DataType::Utf8View - | DataType::Binary - | DataType::LargeBinary - | DataType::BinaryView - | DataType::Int64 - | DataType::UInt64 - | DataType::Float64 - | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) - | DataType::Timestamp(_, _) - | DataType::Date64 - | DataType::Time64(_) - | DataType::Duration(_) - ) + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + | DataType::BinaryView => true, + // Fixed-width types wider than 8 bytes. + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => true, + DataType::FixedSizeBinary(n) => *n > 8, + _ => false, + } } #[derive(Debug, Default)] From 338199a14b5767987e8835c3dc4d5d65fba44274 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 25 Jun 2026 14:17:01 +0000 Subject: [PATCH 10/10] =?UTF-8?q?fix(index):=20rebase=20fixup=20=E2=80=94?= =?UTF-8?q?=20borrow=20selected=5Fold=5Findices=20as=20slice?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `build_per_segment_filters` and `open_and_merge_segments` take `&[&IndexMetadata]` after an upstream signature change in main; pass `&selected_old_indices` to match. Co-Authored-By: Claude Sonnet 4.6 --- rust/lance/src/index/append.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index b7a3e9de097..3ae4a65b99d 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -479,11 +479,11 @@ async fn merge_scalar_indices<'a>( match index_type { IndexType::BTree => { let (_, old_data_filters) = - build_per_segment_filters(dataset.as_ref(), selected_old_indices).await?; + build_per_segment_filters(dataset.as_ref(), &selected_old_indices).await?; crate::index::scalar::btree::open_and_merge_segments( dataset.as_ref(), field_path, - selected_old_indices, + &selected_old_indices, new_data_stream, &new_store, &old_data_filters,