diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 5a6bdbd1e4c..448123a1024 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -287,6 +287,9 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { /// Open an existing file for retrieval async fn open_index_file(&self, name: &str) -> Result>; + /// Return a store that submits its I/O at the given base priority. + fn with_io_priority(&self, io_priority: u64) -> Arc; + /// Copy a range of batches from an index file from this store to another /// /// This is often useful when remapping or updating diff --git a/rust/lance-index/src/scalar/inverted/builder.rs b/rust/lance-index/src/scalar/inverted/builder.rs index 17cb18c5e96..a53b6ddd7dc 100644 --- a/rust/lance-index/src/scalar/inverted/builder.rs +++ b/rust/lance-index/src/scalar/inverted/builder.rs @@ -2364,6 +2364,10 @@ mod tests { self.inner.io_parallelism() } + fn with_io_priority(&self, io_priority: u64) -> Arc { + self.inner.with_io_priority(io_priority) + } + async fn new_index_file( &self, name: &str, @@ -2458,6 +2462,10 @@ mod tests { self.inner.io_parallelism() } + fn with_io_priority(&self, io_priority: u64) -> Arc { + self.inner.with_io_priority(io_priority) + } + async fn new_index_file( &self, name: &str, @@ -2585,6 +2593,11 @@ mod tests { 1 } + fn with_io_priority(&self, _io_priority: u64) -> Arc { + // No backing scheduler, so priority is meaningless here. + self.clone_arc() + } + async fn new_index_file( &self, name: &str, diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 32c044f2d45..1989211ba33 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1091,8 +1091,8 @@ impl InvertedIndex { }; let format = token_set_format; - let partitions = partitions.into_iter().map(|id| { - let store = store.clone(); + let partitions = partitions.into_iter().enumerate().map(|(priority, id)| { + let store = store.with_io_priority(priority as u64); let frag_reuse_index_clone = frag_reuse_index.clone(); let index_cache_for_part = index_cache.with_key_prefix(format!("part-{}", id).as_str()); @@ -6828,7 +6828,7 @@ mod tests { #[derive(Debug)] struct CountingStore { - inner: Arc, + inner: Arc, posting_file: String, counter: Arc, } @@ -6854,6 +6854,13 @@ mod tests { fn io_parallelism(&self) -> usize { self.inner.io_parallelism() } + fn with_io_priority(&self, io_priority: u64) -> Arc { + Arc::new(Self { + inner: self.inner.with_io_priority(io_priority), + posting_file: self.posting_file.clone(), + counter: self.counter.clone(), + }) + } async fn new_index_file( &self, name: &str, @@ -8220,6 +8227,46 @@ mod tests { } } + /// Each partition must read through the shared scheduler at a distinct base + /// priority. Tied priorities (every partition at 0) break the scheduler's + /// backpressure deadlock-break — which admits the lowest-priority in-flight + /// request — because there is no unique lowest request to advance, so a + /// concurrent multi-partition read (e.g. prewarm) can wedge. Distinct + /// per-partition priorities keep the in-flight set totally ordered. + #[tokio::test] + async fn test_partitions_load_with_distinct_priorities() { + let tmpdir = TempObjDir::default(); + let store = Arc::new(LanceIndexStore::new( + ObjectStore::local().into(), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + let (index, _cache) = build_multi_partition_index(&store, 5).await; + + let mut priorities: Vec = index + .partitions + .iter() + .map(|part| { + part.store + .as_any() + .downcast_ref::() + .expect("partition store should be a LanceIndexStore") + .io_priority() + }) + .collect(); + + // Distinct and dense (0..N): every partition reads at its own priority, + // so the shared scheduler sees a total order across all partitions. The + // partitions may finish loading in any order, so sort before comparing — + // what matters is that the priorities form a contiguous, collision-free + // set, not which partition ended up at which slot. + priorities.sort_unstable(); + assert_eq!( + priorities, + (0..index.partitions.len() as u64).collect::>() + ); + } + #[tokio::test] async fn test_update_preserves_loaded_v2_format_version() -> Result<()> { let src_dir = TempObjDir::default(); diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 2f82deb8403..76bcfcead57 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -45,6 +45,8 @@ pub struct LanceIndexStore { /// When set, used to avoid HEAD calls when opening files file_sizes: HashMap, format_version: LanceFileVersion, + /// Base I/O priority for all requests this store submits to `scheduler`. + io_priority: u64, } impl DeepSizeOf for LanceIndexStore { @@ -88,6 +90,7 @@ impl LanceIndexStore { scheduler, file_sizes: HashMap::new(), format_version, + io_priority: 0, } } @@ -100,6 +103,11 @@ impl LanceIndexStore { self } + /// The base I/O priority all this store's requests are submitted at. + pub fn io_priority(&self) -> u64 { + self.io_priority + } + fn index_file_path(&self, name: &str) -> Result { let relative_path = Path::parse(name).map_err(|err| { Error::invalid_input(format!("invalid index file path {name:?}: {err}")) @@ -432,6 +440,15 @@ impl IndexStore for LanceIndexStore { })) } + fn with_io_priority(&self, io_priority: u64) -> Arc { + // The `scheduler` is shared (`Arc`), so this clone is cheap and the new + // priority only affects requests this clone submits. + Arc::new(Self { + io_priority, + ..self.clone() + }) + } + async fn open_index_file(&self, name: &str) -> Result> { let path = self.index_file_path(name)?; // Use cached file size if available, otherwise unknown (requires HEAD call) @@ -440,7 +457,10 @@ impl IndexStore for LanceIndexStore { .get(name) .map(|&size| CachedFileSize::new(size)) .unwrap_or_else(CachedFileSize::unknown); - let file_scheduler = self.scheduler.open_file(&path, &cached_size).await?; + let file_scheduler = self + .scheduler + .open_file_with_priority(&path, self.io_priority, &cached_size) + .await?; match current_reader::FileReader::try_open( file_scheduler, None,