From 0d3808e19045a0e30fc46dfd06a75bf4b30b12d7 Mon Sep 17 00:00:00 2001 From: Jack Ye Date: Tue, 23 Jun 2026 09:17:24 -0700 Subject: [PATCH] perf(index): parallelize FMIndex partition builds --- rust/lance-index/src/scalar/fmindex.rs | 460 +++++++++++++++++++++++-- 1 file changed, 429 insertions(+), 31 deletions(-) diff --git a/rust/lance-index/src/scalar/fmindex.rs b/rust/lance-index/src/scalar/fmindex.rs index 79c949a5426..9d27e4e77b6 100644 --- a/rust/lance-index/src/scalar/fmindex.rs +++ b/rust/lance-index/src/scalar/fmindex.rs @@ -31,6 +31,7 @@ use datafusion::execution::SendableRecordBatchStream; use futures::StreamExt; use lance_core::cache::LanceCache; use lance_core::deepsize::DeepSizeOf; +use lance_core::utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}; use lance_core::{Error, ROW_ADDR, Result}; use roaring::RoaringBitmap; @@ -51,15 +52,58 @@ use crate::{Index, IndexType}; const FMINDEX_INDEX_VERSION: u32 = 10; const BLOCK_WORDS: usize = 4096; const PARTITION_SIZE: usize = 10_000; +const DEFAULT_PARTITION_SIZE_BYTES: usize = 16 * 1024 * 1024; const SENTINEL_BYTE: u8 = 0xFF; /// SA sampling rate. Store every D-th SA entry. Locate walks at most D LF steps. const SA_SAMPLE_RATE: usize = 32; +static LANCE_FMINDEX_NUM_WORKERS: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_FMINDEX_NUM_WORKERS") + .unwrap_or_else(|_| get_num_compute_intensive_cpus().to_string()) + .parse() + .expect("failed to parse LANCE_FMINDEX_NUM_WORKERS") +}); +static LANCE_FMINDEX_PARTITION_ROWS: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_FMINDEX_PARTITION_ROWS") + .unwrap_or_else(|_| PARTITION_SIZE.to_string()) + .parse() + .expect("failed to parse LANCE_FMINDEX_PARTITION_ROWS") +}); +static LANCE_FMINDEX_PARTITION_BYTES: std::sync::LazyLock = std::sync::LazyLock::new(|| { + std::env::var("LANCE_FMINDEX_PARTITION_BYTES") + .unwrap_or_else(|_| DEFAULT_PARTITION_SIZE_BYTES.to_string()) + .parse() + .expect("failed to parse LANCE_FMINDEX_PARTITION_BYTES") +}); +static LANCE_FMINDEX_WRITE_QUEUE_SIZE: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + std::env::var("LANCE_FMINDEX_WRITE_QUEUE_SIZE") + .unwrap_or_else(|_| "1".to_string()) + .parse() + .expect("failed to parse LANCE_FMINDEX_WRITE_QUEUE_SIZE") + }); + fn fmindex_partition_path(partition_id: u64) -> String { format!("part_{partition_id}_fm.lance") } +fn fmindex_num_workers() -> usize { + (*LANCE_FMINDEX_NUM_WORKERS).max(1) +} + +fn fmindex_partition_rows() -> usize { + (*LANCE_FMINDEX_PARTITION_ROWS).max(1) +} + +fn fmindex_partition_bytes() -> usize { + (*LANCE_FMINDEX_PARTITION_BYTES).max(1) +} + +fn fmindex_write_queue_size() -> usize { + (*LANCE_FMINDEX_WRITE_QUEUE_SIZE).max(1) +} + // ── Bitvector with O(1) rank ───────────────────────────────────────────────── const SUPERBLOCK_BITS: usize = 512; @@ -1389,51 +1433,210 @@ impl ScalarIndex for FMIndexScalarIndex { // ── Helpers ────────────────────────────────────────────────────────────────── +#[derive(Debug)] +struct FMIndexPartitionJob { + partition_id: u64, + texts: Vec<(u64, Vec)>, +} + +#[derive(Debug, Clone, Copy)] +struct FMIndexPartitionConfig { + num_workers: usize, + max_rows: usize, + max_bytes: usize, + queue_size: usize, +} + +impl FMIndexPartitionConfig { + fn from_env() -> Self { + Self { + num_workers: fmindex_num_workers(), + max_rows: fmindex_partition_rows(), + max_bytes: fmindex_partition_bytes(), + queue_size: fmindex_write_queue_size(), + } + } + + fn normalized(self) -> Self { + Self { + num_workers: self.num_workers.max(1), + max_rows: self.max_rows.max(1), + max_bytes: self.max_bytes.max(1), + queue_size: self.queue_size.max(1), + } + } +} + async fn write_partitioned_fmindex_stream( + stream: SendableRecordBatchStream, + store: &dyn IndexStore, +) -> Result> { + write_partitioned_fmindex_stream_with_config(stream, store, FMIndexPartitionConfig::from_env()) + .await +} + +async fn write_partitioned_fmindex_stream_with_config( mut stream: SendableRecordBatchStream, store: &dyn IndexStore, + config: FMIndexPartitionConfig, ) -> Result> { - let mut files = Vec::new(); - let mut partition = Vec::with_capacity(PARTITION_SIZE); - let mut partition_id = 0; - - while let Some(batch) = stream.next().await { - let batch = batch?; - // Prefer _rowaddr (global row address) over _rowid to ensure stable, - // globally unique identifiers across segments. - let row_addrs: &arrow_array::UInt64Array = batch - .column_by_name(ROW_ADDR) - .or_else(|| batch.column_by_name("_rowid")) - .and_then(|c| c.as_any().downcast_ref()) - .ok_or_else(|| { - Error::invalid_input("Fm training data must include _rowaddr or _rowid column") - })?; - // Use the named value column; fall back to column(0) for legacy streams - let value_col = batch - .column_by_name(VALUE_COLUMN_NAME) - .unwrap_or_else(|| batch.column(0)); - for i in 0..batch.num_rows() { - let rid = row_addrs.value(i); - if let Some(bytes) = extract_sanitized_text_bytes(value_col.as_ref(), i)? { - partition.push((rid, bytes)); - if partition.len() == PARTITION_SIZE { - files.push(write_fmindex_partition(&partition, store, partition_id).await?); - partition.clear(); - partition_id += 1; + let config = config.normalized(); + log::info!( + "building FMIndex with {} workers, partition rows {}, partition bytes {}", + config.num_workers, + config.max_rows, + config.max_bytes + ); + + let (sender, receiver): ( + async_channel::Sender, + async_channel::Receiver, + ) = async_channel::bounded(config.queue_size); + let store = store.clone_arc(); + let mut worker_tasks = Vec::with_capacity(config.num_workers); + for _ in 0..config.num_workers { + let receiver = receiver.clone(); + let store = store.clone(); + worker_tasks.push(tokio::task::spawn(async move { + let mut files = Vec::new(); + while let Ok(job) = receiver.recv().await { + files.push( + write_fmindex_partition_owned(job.texts, store.clone(), job.partition_id) + .await?, + ); + } + Result::Ok(files) + })); + } + drop(receiver); + + let producer_result = async { + let mut partition = Vec::with_capacity(config.max_rows.min(PARTITION_SIZE)); + let mut partition_bytes = 0usize; + let mut partition_id = 0; + + while let Some(batch) = stream.next().await { + let batch = batch?; + // Prefer _rowaddr (global row address) over _rowid to ensure stable, + // globally unique identifiers across segments. + let row_addrs: &arrow_array::UInt64Array = batch + .column_by_name(ROW_ADDR) + .or_else(|| batch.column_by_name("_rowid")) + .and_then(|c| c.as_any().downcast_ref()) + .ok_or_else(|| { + Error::invalid_input("Fm training data must include _rowaddr or _rowid column") + })?; + // Use the named value column; fall back to column(0) for legacy streams + let value_col = batch + .column_by_name(VALUE_COLUMN_NAME) + .unwrap_or_else(|| batch.column(0)); + for i in 0..batch.num_rows() { + let rid = row_addrs.value(i); + if let Some(bytes) = extract_sanitized_text_bytes(value_col.as_ref(), i)? { + partition_bytes = partition_bytes.saturating_add(bytes.len().saturating_add(1)); + partition.push((rid, bytes)); + if fmindex_partition_limit_reached( + partition.len(), + partition_bytes, + config.max_rows, + config.max_bytes, + ) { + send_fmindex_partition( + &sender, + &mut partition, + &mut partition_bytes, + partition_id, + config.max_rows, + ) + .await?; + partition_id += 1; + } } } } + + if !partition.is_empty() { + send_fmindex_partition( + &sender, + &mut partition, + &mut partition_bytes, + partition_id, + config.max_rows, + ) + .await?; + } + + Result::Ok(()) } + .await; + drop(sender); - if !partition.is_empty() { - files.push(write_fmindex_partition(&partition, store, partition_id).await?); - } else if files.is_empty() { - files.push(write_empty_fmindex_partition(store).await?); + let mut files = Vec::new(); + let mut worker_error = None; + for worker_task in worker_tasks { + match worker_task.await { + Ok(Ok(worker_files)) => files.extend(worker_files), + Ok(Err(err)) => { + if worker_error.is_none() { + worker_error = Some(err); + } + } + Err(err) => { + if worker_error.is_none() { + worker_error = Some(Error::execution(format!( + "FMIndex partition worker failed: {err}" + ))); + } + } + } + } + if let Some(err) = worker_error { + return Err(err); } + producer_result?; + if files.is_empty() { + return Ok(vec![write_empty_fmindex_partition(store.as_ref()).await?]); + } + + files.sort_unstable_by_key(|(partition_id, _)| *partition_id); + let files = files.into_iter().map(|(_, file)| file).collect(); Ok(files) } +fn fmindex_partition_limit_reached( + rows: usize, + bytes: usize, + max_rows: usize, + max_bytes: usize, +) -> bool { + // Byte limits are soft for a single oversized document because FMIndex + // partitions cannot split a document without changing search semantics. + rows >= max_rows || bytes >= max_bytes +} + +async fn send_fmindex_partition( + sender: &async_channel::Sender, + partition: &mut Vec<(u64, Vec)>, + partition_bytes: &mut usize, + partition_id: u64, + max_rows: usize, +) -> Result<()> { + let texts = std::mem::replace(partition, Vec::with_capacity(max_rows.min(PARTITION_SIZE))); + *partition_bytes = 0; + sender + .send(FMIndexPartitionJob { + partition_id, + texts, + }) + .await + .map_err(|err| { + Error::execution(format!( + "failed to schedule FMIndex partition {partition_id}: {err}" + )) + }) +} + fn sanitize_text_bytes(bytes: &[u8]) -> Vec { bytes .iter() @@ -1649,6 +1852,7 @@ async fn write_partitioned_fmindex( Ok(files) } +#[cfg(test)] async fn write_fmindex_partition( texts: &[(u64, Vec)], store: &dyn IndexStore, @@ -1659,6 +1863,20 @@ async fn write_fmindex_partition( write_fmindex(&fm, store, &fmindex_partition_path(partition_id)).await } +async fn write_fmindex_partition_owned( + texts: Vec<(u64, Vec)>, + store: Arc, + partition_id: u64, +) -> Result<(u64, IndexFile)> { + let fm = spawn_cpu(move || { + let refs: Vec<(u64, &[u8])> = texts.iter().map(|(id, t)| (*id, t.as_slice())).collect(); + FMIndex::build(&refs) + }) + .await?; + let file = write_fmindex(&fm, store.as_ref(), &fmindex_partition_path(partition_id)).await?; + Ok((partition_id, file)) +} + async fn write_empty_fmindex_partition(store: &dyn IndexStore) -> Result { let fm = FMIndex::build(&[])?; write_fmindex(&fm, store, &fmindex_partition_path(0)).await @@ -1761,6 +1979,66 @@ mod tests { use crate::scalar::lance_format::LanceIndexStore; + #[derive(Debug, Clone)] + struct FailNewFileStore { + inner: Arc, + } + + impl DeepSizeOf for FailNewFileStore { + fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize { + 0 + } + } + + #[async_trait::async_trait] + impl IndexStore for FailNewFileStore { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn clone_arc(&self) -> Arc { + Arc::new(self.clone()) + } + + fn io_parallelism(&self) -> usize { + self.inner.io_parallelism() + } + + async fn new_index_file( + &self, + _name: &str, + _schema: Arc, + ) -> Result> { + Err(Error::execution( + "injected FMIndex write failure".to_string(), + )) + } + + async fn open_index_file(&self, name: &str) -> Result> { + self.inner.open_index_file(name).await + } + + async fn copy_index_file( + &self, + name: &str, + dest_store: &dyn IndexStore, + ) -> Result { + self.inner.copy_index_file(name, dest_store).await + } + + async fn rename_index_file(&self, name: &str, new_name: &str) -> Result { + self.inner.rename_index_file(name, new_name).await + } + + async fn delete_index_file(&self, name: &str) -> Result<()> { + self.inner.delete_index_file(name).await + } + + async fn list_files_with_sizes(&self) -> Result> { + self.inner.list_files_with_sizes().await + } + } + #[test] fn test_fmindex_build_and_search() { let texts: Vec<(u64, &[u8])> = vec![ @@ -1965,6 +2243,13 @@ mod tests { assert_eq!(data, decoded); } + #[test] + fn test_partition_limit_reached_by_rows_or_bytes() { + assert!(fmindex_partition_limit_reached(10, 5, 10, 100)); + assert!(fmindex_partition_limit_reached(3, 100, 10, 100)); + assert!(!fmindex_partition_limit_reached(3, 99, 10, 100)); + } + #[test] fn test_sentinel_sanitization() { // Text containing \xFF should be sanitized to space during training. @@ -2204,6 +2489,117 @@ mod tests { } } + #[tokio::test(flavor = "multi_thread")] + async fn test_stream_train_splits_partition_by_bytes() { + let docs = vec!["abcd", "efgh", "ijkl"]; + let row_addrs: Vec = vec![0, 1, 2]; + let schema = Arc::new(arrow_schema::Schema::new(vec![ + arrow_schema::Field::new( + crate::scalar::registry::VALUE_COLUMN_NAME, + DataType::Utf8, + false, + ), + arrow_schema::Field::new(ROW_ADDR, DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(docs)), + Arc::new(UInt64Array::from(row_addrs)), + ], + ) + .unwrap(); + + let tempdir = tempfile::tempdir().unwrap(); + let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + index_dir, + Arc::new(LanceCache::no_cache()), + )); + + let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)])); + let files = write_partitioned_fmindex_stream_with_config( + Box::pin(stream), + store.as_ref(), + FMIndexPartitionConfig { + num_workers: 2, + max_rows: 100, + max_bytes: 5, + queue_size: 1, + }, + ) + .await + .unwrap(); + + assert_eq!(files.len(), 3); + assert_eq!(files[0].path, fmindex_partition_path(0)); + assert_eq!(files[1].path, fmindex_partition_path(1)); + assert_eq!(files[2].path, fmindex_partition_path(2)); + + let index = FMIndexScalarIndex::load(store, None, &LanceCache::no_cache()) + .await + .unwrap(); + let r = index + .search( + &TextQuery::StringContains("efgh".to_string()), + &crate::metrics::NoOpMetricsCollector, + ) + .await + .unwrap(); + match r { + SearchResult::Exact(set) => { + assert_eq!(set.len(), Some(1)); + } + _ => panic!("expected exact result"), + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_stream_train_propagates_worker_write_error() { + let schema = Arc::new(arrow_schema::Schema::new(vec![ + arrow_schema::Field::new( + crate::scalar::registry::VALUE_COLUMN_NAME, + DataType::Utf8, + false, + ), + arrow_schema::Field::new(ROW_ADDR, DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["hello"])), + Arc::new(UInt64Array::from(vec![0])), + ], + ) + .unwrap(); + + let tempdir = tempfile::tempdir().unwrap(); + let index_dir = Path::from_filesystem_path(tempdir.path()).unwrap(); + let inner = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + index_dir, + Arc::new(LanceCache::no_cache()), + )) as Arc; + let store = FailNewFileStore { inner }; + + let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)])); + let err = write_partitioned_fmindex_stream_with_config( + Box::pin(stream), + &store, + FMIndexPartitionConfig { + num_workers: 1, + max_rows: 1, + max_bytes: 1024, + queue_size: 1, + }, + ) + .await + .unwrap_err(); + + assert!(format!("{err}").contains("injected FMIndex write failure")); + } + #[tokio::test(flavor = "multi_thread")] async fn test_plugin_train_streams_multiple_partitions() { fn training_batch( @@ -2266,6 +2662,8 @@ mod tests { .unwrap(); assert_eq!(created.files.len(), 2); + assert_eq!(created.files[0].path, fmindex_partition_path(0)); + assert_eq!(created.files[1].path, fmindex_partition_path(1)); let index = FMIndexPlugin .load_index(store, &created.index_details, None, &LanceCache::no_cache())