Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion rust/lance-index/src/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,40 @@
use std::any::Any;
use std::sync::Arc;

use arrow_array::RecordBatch;
use async_trait::async_trait;
use lance_core::{Error, Result};
use roaring::RoaringBitmap;
use lance_select::RowAddrTreeMap;
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::Serialize;

pub use lance_table::system_index::frag_reuse::*;

use crate::scalar::RowIdRemapper;
use crate::{Index, IndexType};

impl RowIdRemapper for FragReuseIndex {
fn remap_row_id(&self, row_id: u64) -> Option<u64> {
self.remap_row_id(row_id)
}

fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
self.remap_row_addrs_tree_map(row_addrs)
}

fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
self.remap_row_ids_roaring_tree_map(row_ids)
}

fn remap_row_ids_record_batch(
&self,
batch: RecordBatch,
row_id_idx: usize,
) -> Result<RecordBatch> {
self.remap_row_ids_record_batch(batch, row_id_idx)
}
}

#[derive(Serialize)]
struct FragReuseStatistics {
num_versions: usize,
Expand Down
23 changes: 21 additions & 2 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use lance_core::deepsize::DeepSizeOf;
use lance_core::{Error, Result};
use lance_io::stream::{RecordBatchStream, RecordBatchStreamAdapter};
use lance_select::{NullableRowAddrSet, RowAddrTreeMap, RowSetOps};
use roaring::RoaringBitmap;
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::Serialize;

use crate::metrics::MetricsCollector;
Expand All @@ -48,7 +48,6 @@ pub mod rtree;
pub mod zoned;
pub mod zonemap;

use crate::frag_reuse::FragReuseIndex;
pub use inverted::tokenizer::InvertedIndexParams;
use lance_datafusion::udf::CONTAINS_TOKENS_UDF;

Expand Down Expand Up @@ -1076,3 +1075,23 @@ pub trait ScalarIndex: Send + Sync + std::fmt::Debug + Index + DeepSizeOf {
/// with the same configuration on another dataset.
fn derive_index_params(&self) -> Result<ScalarIndexParams>;
}

/// Abstraction over any type that can remap row IDs during index loading.
///
/// This decouples scalar index plugins from the table-level [`crate::frag_reuse::FragReuseIndex`]
/// type. [`crate::frag_reuse::FragReuseIndex`] implements this trait, but callers may also
/// supply custom implementations for testing or other remapping strategies.
pub trait RowIdRemapper: Send + Sync + std::fmt::Debug {
/// Remap a single row id. Returns `None` if the row was deleted.
fn remap_row_id(&self, row_id: u64) -> Option<u64>;
/// Remap all addresses in a [`RowAddrTreeMap`], dropping deleted rows.
fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap;
/// Remap all row ids in a [`RoaringTreemap`], dropping deleted rows.
fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap;
/// Remap the row-id column at `row_id_idx` inside `batch`, dropping deleted rows.
fn remap_row_ids_record_batch(
&self,
batch: RecordBatch,
row_id_idx: usize,
) -> Result<RecordBatch>;
}
15 changes: 7 additions & 8 deletions rust/lance-index/src/scalar/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ use super::{
use crate::pbold;
use crate::{Index, IndexType, metrics::MetricsCollector};
use crate::{
frag_reuse::FragReuseIndex,
progress::IndexBuildProgress,
scalar::{
CreatedIndex, UpdateCriteria,
CreatedIndex, RowIdRemapper, UpdateCriteria,
expression::SargableQueryParser,
registry::{
ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
Expand Down Expand Up @@ -125,7 +124,7 @@ pub struct BitmapIndex {

index_cache: WeakLanceCache,

frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,

lazy_reader: LazyIndexReader,
}
Expand Down Expand Up @@ -200,7 +199,7 @@ impl BitmapIndexState {
&self,
store: Arc<dyn IndexStore>,
index_cache: &LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Arc<BitmapIndex>> {
Ok(Arc::new(BitmapIndex::new(
self.index_map.clone(),
Expand Down Expand Up @@ -335,7 +334,7 @@ impl BitmapIndex {
value_type: DataType,
store: Arc<dyn IndexStore>,
index_cache: WeakLanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Self {
let lazy_reader = LazyIndexReader::new(store.clone());
Self {
Expand All @@ -351,7 +350,7 @@ impl BitmapIndex {

pub(crate) async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let page_lookup_file = store.open_index_file(BITMAP_LOOKUP_NAME).await?;
Expand Down Expand Up @@ -1760,7 +1759,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BitmapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
Expand All @@ -1769,7 +1768,7 @@ impl ScalarIndexPlugin for BitmapIndexPlugin {
async fn get_from_cache(
&self,
index_store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Option<Arc<dyn ScalarIndex>>> {
let Some(state) = cache.get_with_key(&BitmapIndexStateKey).await else {
Expand Down
9 changes: 5 additions & 4 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use std::sync::LazyLock;
use datafusion::execution::SendableRecordBatchStream;
use std::{collections::HashMap, sync::Arc};

use crate::scalar::FragReuseIndex;
use crate::scalar::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
use crate::scalar::{
AnyQuery, IndexStore, MetricsCollector, RowIdRemapper, ScalarIndex, SearchResult,
};
use crate::{Index, IndexType};
use arrow_array::{ArrayRef, RecordBatch};
use async_trait::async_trait;
Expand Down Expand Up @@ -89,7 +90,7 @@ impl DeepSizeOf for BloomFilterIndex {
impl BloomFilterIndex {
async fn load(
store: Arc<dyn IndexStore>,
_fri: Option<Arc<FragReuseIndex>>,
_fri: Option<Arc<dyn RowIdRemapper>>,
_index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let index_file = store.open_index_file(BLOOMFILTER_FILENAME).await?;
Expand Down Expand Up @@ -1100,7 +1101,7 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin {
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(
Expand Down
34 changes: 17 additions & 17 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ use super::{
};
use crate::cache_pb::{BTreeIndexHeader, RangeToFile};
use crate::{Index, IndexType};
use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
use crate::{pbold, scalar::btree::flat::FlatIndex};
use crate::{
frag_reuse::FragReuseIndex,
progress::{IndexBuildProgress, noop_progress},
scalar::{
CreatedIndex, UpdateCriteria,
CreatedIndex, RowIdRemapper, UpdateCriteria,
expression::{SargableQueryParser, ScalarQueryParser},
registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},
},
};
use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
use crate::{pbold, scalar::btree::flat::FlatIndex};
use arrow_arith::numeric::add;
use arrow_array::{
Array, ArrayAccessor, ArrowNativeTypeOp, PrimitiveArray, RecordBatch, UInt32Array,
Expand Down Expand Up @@ -1393,7 +1392,7 @@ impl BTreeIndexState {
&self,
store: Arc<dyn IndexStore>,
index_cache: &LanceCache,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Arc<dyn ScalarIndex>> {
let index = BTreeIndex::try_from_serialized(
self.lookup_batch.clone(),
Expand Down Expand Up @@ -1519,7 +1518,7 @@ pub struct BTreeIndex {
/// - The local page_idx is calculated: `142 - 100 = 42`.
/// - The system now knows to read page `42` from the file `part_2_page_file.lance`.
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
}

impl DeepSizeOf for BTreeIndex {
Expand All @@ -1540,7 +1539,7 @@ impl BTreeIndex {
index_cache: WeakLanceCache,
batch_size: u64,
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Self {
Self {
page_lookup,
Expand Down Expand Up @@ -1696,7 +1695,7 @@ impl BTreeIndex {
index_cache: &LanceCache,
batch_size: u64,
ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
) -> Result<Self> {
let data_type = data.column(0).data_type().clone();
let page_lookup = Arc::new(BTreeLookup::try_new(data)?);
Expand All @@ -1714,7 +1713,7 @@ impl BTreeIndex {

async fn load(
store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
index_cache: &LanceCache,
) -> Result<Arc<Self>> {
let (page_lookup_file, standalone_partition_page_file) =
Expand Down Expand Up @@ -1950,7 +1949,7 @@ fn filter_keeps_nothing(filter: &Option<OldIndexDataFilter>) -> bool {

fn remap_row_ids(
stream: SendableRecordBatchStream,
frag_reuse_index: Arc<FragReuseIndex>,
frag_reuse_index: Arc<dyn RowIdRemapper>,
) -> SendableRecordBatchStream {
let schema = stream.schema();
let remapped = stream.map(move |batch_result| {
Expand Down Expand Up @@ -3285,7 +3284,7 @@ impl ScalarIndexPlugin for BTreeIndexPlugin {
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
Expand All @@ -3294,7 +3293,7 @@ impl ScalarIndexPlugin for BTreeIndexPlugin {
async fn get_from_cache(
&self,
index_store: Arc<dyn IndexStore>,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Option<Arc<dyn ScalarIndex>>> {
let Some(state) = cache.get_with_key(&BTreeIndexStateKey).await else {
Expand Down Expand Up @@ -6286,11 +6285,12 @@ mod tests {
// Remap row 0 -> row 5000 (outside the original [0, 1000) range so no collision).
// Querying for value == 0 should now return row 5000, confirming reconstruct threaded
// the FragReuseIndex through to the rebuilt BTreeIndex.
let frag_reuse_index = Arc::new(FragReuseIndex::new(
Uuid::new_v4(),
vec![HashMap::from([(0u64, Some(5000u64))])],
FragReuseIndexDetails { versions: vec![] },
));
let frag_reuse_index: Arc<dyn crate::scalar::RowIdRemapper> =
Arc::new(FragReuseIndex::new(
Uuid::new_v4(),
vec![HashMap::from([(0u64, Some(5000u64))])],
FragReuseIndexDetails { versions: vec![] },
));
let reconstructed = state
.reconstruct(
test_store.clone(),
Expand Down
7 changes: 3 additions & 4 deletions rust/lance-index/src/scalar/fmindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use lance_core::deepsize::DeepSizeOf;
use lance_core::{Error, ROW_ADDR, Result};
use roaring::RoaringBitmap;

use crate::frag_reuse::FragReuseIndex;
use crate::metrics::MetricsCollector;
use crate::pb;
use crate::scalar::expression::{ScalarQueryParser, TextQueryParser};
Expand All @@ -44,7 +43,7 @@ use crate::scalar::registry::{
};
use crate::scalar::{
AnyQuery, BuiltinIndexType, CreatedIndex, IndexFile, IndexStore, OldIndexDataFilter,
ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria,
RowIdRemapper, ScalarIndex, ScalarIndexParams, SearchResult, TextQuery, UpdateCriteria,
};
use crate::{Index, IndexType};

Expand Down Expand Up @@ -1257,7 +1256,7 @@ impl FMIndexScalarIndex {

async fn load(
store: Arc<dyn IndexStore>,
_fri: Option<Arc<FragReuseIndex>>,
_fri: Option<Arc<dyn RowIdRemapper>>,
_cache: &LanceCache,
) -> Result<Arc<Self>> {
let files = store.list_files_with_sizes().await?;
Expand Down Expand Up @@ -1731,7 +1730,7 @@ impl ScalarIndexPlugin for FMIndexPlugin {
&self,
store: Arc<dyn IndexStore>,
details: &prost_types::Any,
fri: Option<Arc<FragReuseIndex>>,
fri: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
let _ = details
Expand Down
13 changes: 5 additions & 8 deletions rust/lance-index/src/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,10 @@ use lance_core::Error;

use crate::pbold;
use crate::progress::IndexBuildProgress;
use crate::{
frag_reuse::FragReuseIndex,
scalar::{
CreatedIndex, ScalarIndex,
expression::{FtsQueryParser, ScalarQueryParser},
registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest},
},
use crate::scalar::{
CreatedIndex, RowIdRemapper, ScalarIndex,
expression::{FtsQueryParser, ScalarQueryParser},
registry::{ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest},
};

use super::IndexStore;
Expand Down Expand Up @@ -289,7 +286,7 @@ impl ScalarIndexPlugin for InvertedIndexPlugin {
&self,
index_store: Arc<dyn IndexStore>,
_index_details: &prost_types::Any,
frag_reuse_index: Option<Arc<FragReuseIndex>>,
frag_reuse_index: Option<Arc<dyn RowIdRemapper>>,
cache: &LanceCache,
) -> Result<Arc<dyn ScalarIndex>> {
Ok(
Expand Down
Loading
Loading