From 6ebcf49912789f015acba2f03d000e7e695f8d1a Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 24 Jun 2026 13:51:17 -0700 Subject: [PATCH 1/5] fix(index): give each FTS partition a distinct scheduler base priority Multi-partition inverted-index reads (FTS prewarm, corpus stats, df) fan out over partitions through one shared ScanScheduler. Every partition opened its files at base_priority 0, so all concurrent reads tied at the same priority. The scheduler's backpressure deadlock-break admits the lowest-priority in-flight request without a byte-budget check, which requires a total order over in-flight priorities to guarantee one request can always advance and drain the byte budget. With every partition at priority 0 there is no unique lowest request, so when the shared 64-IOP scheduler saturates, freed IOP slots go to arbitrary tied tasks and no consumer's batch is guaranteed to complete and refund resources -> the I/O loop can wedge with work pending but none admissible. Stamp each partition's IndexStore with its enumerate index as the base priority at load time, mirroring how a filtered read scan prioritizes each fragment. The stores share the same scheduler (Arc), so global backpressure is unchanged; only the priority differs, restoring the total order across partitions that the deadlock-break relies on. The single, non-concurrent metadata read stays at priority 0 (no tie). Adds IndexStore::with_base_priority (default no-op for backends without a priority concept) and a test asserting partitions load with distinct, dense priorities. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance-index/src/scalar.rs | 11 ++++ rust/lance-index/src/scalar/inverted/index.rs | 52 ++++++++++++++++++- rust/lance-index/src/scalar/lance_format.rs | 40 +++++++++++++- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 5a6bdbd1e4c..74e04eef31b 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -287,6 +287,17 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { /// Open an existing file for retrieval async fn open_index_file(&self, name: &str) -> Result>; + /// Return a store that submits its I/O at the given base priority. + /// + /// Stores backed by a shared [`lance_io::scheduler::ScanScheduler`] use this + /// to give concurrently-read partitions distinct priorities, keeping the + /// scheduler's backpressure deadlock-break totally ordered (mirrors how a + /// filtered read scan prioritizes each fragment). The default returns the + /// store unchanged for backends where priority is meaningless. + fn with_base_priority(&self, _base_priority: u64) -> Arc { + self.clone_arc() + } + /// Copy a range of batches from an index file from this store to another /// /// This is often useful when remapping or updating diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 32c044f2d45..4af59ddfcb7 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1091,8 +1091,16 @@ impl InvertedIndex { }; let format = token_set_format; - let partitions = partitions.into_iter().map(|id| { - let store = store.clone(); + let partitions = partitions.into_iter().enumerate().map(|(idx, id)| { + // Give each partition's store a distinct base priority so all + // its reads are totally ordered against the other partitions' + // in the shared scheduler. Without this, every partition opens + // at priority 0; the scheduler's backpressure deadlock-break + // ("admit the lowest-priority in-flight request") then has no + // unique lowest request to advance and can wedge when many + // partitions read concurrently (e.g. prewarm). Mirrors how a + // filtered read scan prioritizes each fragment by its index. + let store = store.with_base_priority(idx as u64); let frag_reuse_index_clone = frag_reuse_index.clone(); let index_cache_for_part = index_cache.with_key_prefix(format!("part-{}", id).as_str()); @@ -8220,6 +8228,46 @@ mod tests { } } + /// Each partition must read through the shared scheduler at a distinct base + /// priority. Tied priorities (every partition at 0) break the scheduler's + /// backpressure deadlock-break — which admits the lowest-priority in-flight + /// request — because there is no unique lowest request to advance, so a + /// concurrent multi-partition read (e.g. prewarm) can wedge. Distinct + /// per-partition priorities keep the in-flight set totally ordered. + #[tokio::test] + async fn test_partitions_load_with_distinct_priorities() { + let tmpdir = TempObjDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + let (index, _cache) = build_multi_partition_index(&store, 5).await; + + let mut priorities: Vec = index + .partitions + .iter() + .map(|part| { + part.store + .as_any() + .downcast_ref::() + .expect("partition store should be a LanceIndexStore") + .base_priority() + }) + .collect(); + + // Distinct and dense (0..N): every partition reads at its own priority, + // so the shared scheduler sees a total order across all partitions. The + // partitions may finish loading in any order, so sort before comparing — + // what matters is that the priorities form a contiguous, collision-free + // set, not which partition ended up at which slot. + priorities.sort_unstable(); + assert_eq!( + priorities, + (0..index.partitions.len() as u64).collect::>() + ); + } + #[tokio::test] async fn test_update_preserves_loaded_v2_format_version() -> Result<()> { let src_dir = TempObjDir::default(); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 2f82deb8403..8149813b27d 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -45,6 +45,16 @@ pub struct LanceIndexStore { /// When set, used to avoid HEAD calls when opening files file_sizes: HashMap, format_version: LanceFileVersion, + /// Base priority for all I/O this store submits to `scheduler`. + /// + /// The scheduler's backpressure deadlock-break rule ("the lowest-priority + /// in-flight request is always admitted") needs priorities to be totally + /// ordered. When many partitions read concurrently through one shared + /// scheduler (e.g. FTS prewarm fanning out over partitions), giving each + /// partition's store a distinct `base_priority` keeps that total order so + /// one request can always advance and drain the byte budget. Mirrors how a + /// filtered read scan gives each fragment a distinct reader priority. + base_priority: u64, } impl DeepSizeOf for LanceIndexStore { @@ -88,6 +98,22 @@ impl LanceIndexStore { scheduler, file_sizes: HashMap::new(), format_version, + base_priority: 0, + } + } + + /// Return a clone of this store whose I/O is submitted at `base_priority`. + /// + /// The underlying `scheduler` is shared (it is an `Arc`), so the clone is + /// cheap and the priority only affects requests this clone submits. Used to + /// give each concurrently-read partition a distinct priority so the + /// scheduler's backpressure deadlock-break stays well-ordered. Exposed as a + /// concrete `Self` here; the `IndexStore::with_base_priority` trait method + /// wraps it in an `Arc`. + pub fn cloned_with_priority(&self, base_priority: u64) -> Self { + Self { + base_priority, + ..self.clone() } } @@ -100,6 +126,11 @@ impl LanceIndexStore { self } + /// The base priority all this store's I/O is submitted at. + pub fn base_priority(&self) -> u64 { + self.base_priority + } + fn index_file_path(&self, name: &str) -> Result { let relative_path = Path::parse(name).map_err(|err| { Error::invalid_input(format!("invalid index file path {name:?}: {err}")) @@ -432,6 +463,10 @@ impl IndexStore for LanceIndexStore { })) } + fn with_base_priority(&self, base_priority: u64) -> Arc { + Arc::new(self.cloned_with_priority(base_priority)) + } + async fn open_index_file(&self, name: &str) -> Result> { let path = self.index_file_path(name)?; // Use cached file size if available, otherwise unknown (requires HEAD call) @@ -440,7 +475,10 @@ impl IndexStore for LanceIndexStore { .get(name) .map(|&size| CachedFileSize::new(size)) .unwrap_or_else(CachedFileSize::unknown); - let file_scheduler = self.scheduler.open_file(&path, &cached_size).await?; + let file_scheduler = self + .scheduler + .open_file_with_priority(&path, self.base_priority, &cached_size) + .await?; match current_reader::FileReader::try_open( file_scheduler, None, From 9ea845efe5537d0e34032c1bcde026ed81535ba5 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 24 Jun 2026 13:57:09 -0700 Subject: [PATCH 2/5] fix(index): give each FTS partition a distinct scheduler base priority Multi-partition inverted-index reads (FTS prewarm, corpus stats, df) fan out over partitions through one shared ScanScheduler. Every partition opened its files at base_priority 0, so all concurrent reads tied at the same priority. The scheduler's backpressure deadlock-break admits the lowest-priority in-flight request without a byte-budget check, which requires a total order over in-flight priorities to guarantee one request can always advance and drain the byte budget. With every partition at priority 0 there is no unique lowest request, so when the shared 64-IOP scheduler saturates, freed IOP slots go to arbitrary tied tasks and no consumer's batch is guaranteed to complete and refund resources -> the I/O loop can wedge with work pending but none admissible. Stamp each partition's IndexStore with its enumerate index as the base priority at load time, mirroring how a filtered read scan prioritizes each fragment. The stores share the same scheduler (Arc), so global backpressure is unchanged; only the priority differs, restoring the total order across partitions that the deadlock-break relies on. The single, non-concurrent metadata read stays at priority 0 (no tie). Adds IndexStore::with_base_priority (default no-op for backends without a priority concept) and a test asserting partitions load with distinct, dense priorities. Co-Authored-By: Claude Opus 4.8 (1M context) --- rust/lance-index/src/scalar/inverted/index.rs | 12 ++-------- rust/lance-index/src/scalar/lance_format.rs | 22 +++++-------------- 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 4af59ddfcb7..33199f19468 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1091,16 +1091,8 @@ impl InvertedIndex { }; let format = token_set_format; - let partitions = partitions.into_iter().enumerate().map(|(idx, id)| { - // Give each partition's store a distinct base priority so all - // its reads are totally ordered against the other partitions' - // in the shared scheduler. Without this, every partition opens - // at priority 0; the scheduler's backpressure deadlock-break - // ("admit the lowest-priority in-flight request") then has no - // unique lowest request to advance and can wedge when many - // partitions read concurrently (e.g. prewarm). Mirrors how a - // filtered read scan prioritizes each fragment by its index. - let store = store.with_base_priority(idx as u64); + let partitions = partitions.into_iter().enumerate().map(|(priority, id)| { + let store = store.with_base_priority(priority as u64); let frag_reuse_index_clone = frag_reuse_index.clone(); let index_cache_for_part = index_cache.with_key_prefix(format!("part-{}", id).as_str()); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 8149813b27d..cf1e8dfb4a8 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -102,21 +102,6 @@ impl LanceIndexStore { } } - /// Return a clone of this store whose I/O is submitted at `base_priority`. - /// - /// The underlying `scheduler` is shared (it is an `Arc`), so the clone is - /// cheap and the priority only affects requests this clone submits. Used to - /// give each concurrently-read partition a distinct priority so the - /// scheduler's backpressure deadlock-break stays well-ordered. Exposed as a - /// concrete `Self` here; the `IndexStore::with_base_priority` trait method - /// wraps it in an `Arc`. - pub fn cloned_with_priority(&self, base_priority: u64) -> Self { - Self { - base_priority, - ..self.clone() - } - } - /// Set cached file sizes to avoid HEAD calls when opening files. /// /// The map should contain relative paths (e.g., "index.idx") as keys @@ -464,7 +449,12 @@ impl IndexStore for LanceIndexStore { } fn with_base_priority(&self, base_priority: u64) -> Arc { - Arc::new(self.cloned_with_priority(base_priority)) + // The `scheduler` is shared (`Arc`), so this clone is cheap and the new + // priority only affects requests this clone submits. + Arc::new(Self { + base_priority, + ..self.clone() + }) } async fn open_index_file(&self, name: &str) -> Result> { From 670546f781bc1c8d763b9c24f1f3e8bdfefa0c1a Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 24 Jun 2026 14:24:06 -0700 Subject: [PATCH 3/5] improve naming --- rust/lance-index/src/scalar.rs | 10 +--------- rust/lance-index/src/scalar/inverted/builder.rs | 13 +++++++++++++ rust/lance-index/src/scalar/inverted/index.rs | 5 ++++- rust/lance-index/src/scalar/lance_format.rs | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 74e04eef31b..7fe888054b5 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -288,15 +288,7 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { async fn open_index_file(&self, name: &str) -> Result>; /// Return a store that submits its I/O at the given base priority. - /// - /// Stores backed by a shared [`lance_io::scheduler::ScanScheduler`] use this - /// to give concurrently-read partitions distinct priorities, keeping the - /// scheduler's backpressure deadlock-break totally ordered (mirrors how a - /// filtered read scan prioritizes each fragment). The default returns the - /// store unchanged for backends where priority is meaningless. - fn with_base_priority(&self, _base_priority: u64) -> Arc { - self.clone_arc() - } + fn with_io_priority(&self, base_priority: u64) -> Arc; /// Copy a range of batches from an index file from this store to another /// diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 17cb18c5e96..f8c53738240 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -2364,6 +2364,10 @@ mod tests { self.inner.io_parallelism() } + fn with_io_priority(&self, base_priority: u64) -> Arc { + self.inner.with_io_priority(base_priority) + } + async fn new_index_file( &self, name: &str, @@ -2458,6 +2462,10 @@ mod tests { self.inner.io_parallelism() } + fn with_io_priority(&self, base_priority: u64) -> Arc { + self.inner.with_io_priority(base_priority) + } + async fn new_index_file( &self, name: &str, @@ -2585,6 +2593,11 @@ mod tests { 1 } + fn with_io_priority(&self, _base_priority: u64) -> Arc { + // No backing scheduler, so priority is meaningless here. + self.clone_arc() + } + async fn new_index_file( &self, name: &str, diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 33199f19468..f09d3b26736 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1092,7 +1092,7 @@ impl InvertedIndex { let format = token_set_format; let partitions = partitions.into_iter().enumerate().map(|(priority, id)| { - let store = store.with_base_priority(priority as u64); + let store = store.with_io_priority(priority as u64); let frag_reuse_index_clone = frag_reuse_index.clone(); let index_cache_for_part = index_cache.with_key_prefix(format!("part-{}", id).as_str()); @@ -6854,6 +6854,9 @@ mod tests { fn io_parallelism(&self) -> usize { self.inner.io_parallelism() } + fn with_io_priority(&self, base_priority: u64) -> Arc { + self.inner.with_io_priority(base_priority) + } async fn new_index_file( &self, name: &str, diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index cf1e8dfb4a8..9d671b05d26 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -448,7 +448,7 @@ impl IndexStore for LanceIndexStore { })) } - fn with_base_priority(&self, base_priority: u64) -> Arc { + fn with_io_priority(&self, base_priority: u64) -> Arc { // The `scheduler` is shared (`Arc`), so this clone is cheap and the new // priority only affects requests this clone submits. Arc::new(Self { From 0edf2ed254752e3c4d19da64c1877c5d39eb5ca9 Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 24 Jun 2026 14:27:12 -0700 Subject: [PATCH 4/5] improve --- rust/lance-index/src/scalar.rs | 2 +- .../src/scalar/inverted/builder.rs | 10 +++---- rust/lance-index/src/scalar/inverted/index.rs | 6 ++--- rust/lance-index/src/scalar/lance_format.rs | 26 +++++++------------ 4 files changed, 18 insertions(+), 26 deletions(-) diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 7fe888054b5..448123a1024 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -288,7 +288,7 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { async fn open_index_file(&self, name: &str) -> Result>; /// Return a store that submits its I/O at the given base priority. - fn with_io_priority(&self, base_priority: u64) -> Arc; + fn with_io_priority(&self, io_priority: u64) -> Arc; /// Copy a range of batches from an index file from this store to another /// diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index f8c53738240..a53b6ddd7dc 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -2364,8 +2364,8 @@ mod tests { self.inner.io_parallelism() } - fn with_io_priority(&self, base_priority: u64) -> Arc { - self.inner.with_io_priority(base_priority) + fn with_io_priority(&self, io_priority: u64) -> Arc { + self.inner.with_io_priority(io_priority) } async fn new_index_file( @@ -2462,8 +2462,8 @@ mod tests { self.inner.io_parallelism() } - fn with_io_priority(&self, base_priority: u64) -> Arc { - self.inner.with_io_priority(base_priority) + fn with_io_priority(&self, io_priority: u64) -> Arc { + self.inner.with_io_priority(io_priority) } async fn new_index_file( @@ -2593,7 +2593,7 @@ mod tests { 1 } - fn with_io_priority(&self, _base_priority: u64) -> Arc { + fn with_io_priority(&self, _io_priority: u64) -> Arc { // No backing scheduler, so priority is meaningless here. self.clone_arc() } diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index f09d3b26736..a681bb95969 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -6854,8 +6854,8 @@ mod tests { fn io_parallelism(&self) -> usize { self.inner.io_parallelism() } - fn with_io_priority(&self, base_priority: u64) -> Arc { - self.inner.with_io_priority(base_priority) + fn with_io_priority(&self, io_priority: u64) -> Arc { + self.inner.with_io_priority(io_priority) } async fn new_index_file( &self, @@ -8247,7 +8247,7 @@ mod tests { .as_any() .downcast_ref::() .expect("partition store should be a LanceIndexStore") - .base_priority() + .io_priority() }) .collect(); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 9d671b05d26..76bcfcead57 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -45,16 +45,8 @@ pub struct LanceIndexStore { /// When set, used to avoid HEAD calls when opening files file_sizes: HashMap, format_version: LanceFileVersion, - /// Base priority for all I/O this store submits to `scheduler`. - /// - /// The scheduler's backpressure deadlock-break rule ("the lowest-priority - /// in-flight request is always admitted") needs priorities to be totally - /// ordered. When many partitions read concurrently through one shared - /// scheduler (e.g. FTS prewarm fanning out over partitions), giving each - /// partition's store a distinct `base_priority` keeps that total order so - /// one request can always advance and drain the byte budget. Mirrors how a - /// filtered read scan gives each fragment a distinct reader priority. - base_priority: u64, + /// Base I/O priority for all requests this store submits to `scheduler`. + io_priority: u64, } impl DeepSizeOf for LanceIndexStore { @@ -98,7 +90,7 @@ impl LanceIndexStore { scheduler, file_sizes: HashMap::new(), format_version, - base_priority: 0, + io_priority: 0, } } @@ -111,9 +103,9 @@ impl LanceIndexStore { self } - /// The base priority all this store's I/O is submitted at. - pub fn base_priority(&self) -> u64 { - self.base_priority + /// The base I/O priority all this store's requests are submitted at. + pub fn io_priority(&self) -> u64 { + self.io_priority } fn index_file_path(&self, name: &str) -> Result { @@ -448,11 +440,11 @@ impl IndexStore for LanceIndexStore { })) } - fn with_io_priority(&self, base_priority: u64) -> Arc { + fn with_io_priority(&self, io_priority: u64) -> Arc { // The `scheduler` is shared (`Arc`), so this clone is cheap and the new // priority only affects requests this clone submits. Arc::new(Self { - base_priority, + io_priority, ..self.clone() }) } @@ -467,7 +459,7 @@ impl IndexStore for LanceIndexStore { .unwrap_or_else(CachedFileSize::unknown); let file_scheduler = self .scheduler - .open_file_with_priority(&path, self.base_priority, &cached_size) + .open_file_with_priority(&path, self.io_priority, &cached_size) .await?; match current_reader::FileReader::try_open( file_scheduler, From 466b9c05e3bc2bfd26af077a4599d7900b0f38cd Mon Sep 17 00:00:00 2001 From: Lu Qiu Date: Wed, 24 Jun 2026 15:19:18 -0700 Subject: [PATCH 5/5] Fix tests --- rust/lance-index/src/scalar/inverted/index.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index a681bb95969..1989211ba33 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -6828,7 +6828,7 @@ mod tests { #[derive(Debug)] struct CountingStore { - inner: Arc, + inner: Arc, posting_file: String, counter: Arc, } @@ -6855,7 +6855,11 @@ mod tests { self.inner.io_parallelism() } fn with_io_priority(&self, io_priority: u64) -> Arc { - self.inner.with_io_priority(io_priority) + Arc::new(Self { + inner: self.inner.with_io_priority(io_priority), + posting_file: self.posting_file.clone(), + counter: self.counter.clone(), + }) } async fn new_index_file( &self,