Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
/// Open an existing file for retrieval
async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>>;

/// Return a store that submits its I/O at the given base priority.
fn with_io_priority(&self, io_priority: u64) -> Arc<dyn IndexStore>;

/// Copy a range of batches from an index file from this store to another
///
/// This is often useful when remapping or updating
Expand Down
13 changes: 13 additions & 0 deletions rust/lance-index/src/scalar/inverted/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2364,6 +2364,10 @@ mod tests {
self.inner.io_parallelism()
}

fn with_io_priority(&self, io_priority: u64) -> Arc<dyn IndexStore> {
self.inner.with_io_priority(io_priority)
}

async fn new_index_file(
&self,
name: &str,
Expand Down Expand Up @@ -2458,6 +2462,10 @@ mod tests {
self.inner.io_parallelism()
}

fn with_io_priority(&self, io_priority: u64) -> Arc<dyn IndexStore> {
self.inner.with_io_priority(io_priority)
}

async fn new_index_file(
&self,
name: &str,
Expand Down Expand Up @@ -2585,6 +2593,11 @@ mod tests {
1
}

fn with_io_priority(&self, _io_priority: u64) -> Arc<dyn IndexStore> {
// No backing scheduler, so priority is meaningless here.
self.clone_arc()
}

async fn new_index_file(
&self,
name: &str,
Expand Down
53 changes: 50 additions & 3 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1091,8 +1091,8 @@ impl InvertedIndex {
};

let format = token_set_format;
let partitions = partitions.into_iter().map(|id| {
let store = store.clone();
let partitions = partitions.into_iter().enumerate().map(|(priority, id)| {
let store = store.with_io_priority(priority as u64);
let frag_reuse_index_clone = frag_reuse_index.clone();
let index_cache_for_part =
index_cache.with_key_prefix(format!("part-{}", id).as_str());
Expand Down Expand Up @@ -6828,7 +6828,7 @@ mod tests {

#[derive(Debug)]
struct CountingStore {
inner: Arc<LanceIndexStore>,
inner: Arc<dyn IndexStore>,
posting_file: String,
counter: Arc<PostingMetadataCounter>,
}
Expand All @@ -6854,6 +6854,13 @@ mod tests {
fn io_parallelism(&self) -> usize {
self.inner.io_parallelism()
}
fn with_io_priority(&self, io_priority: u64) -> Arc<dyn IndexStore> {
Arc::new(Self {
inner: self.inner.with_io_priority(io_priority),
posting_file: self.posting_file.clone(),
counter: self.counter.clone(),
})
}
async fn new_index_file(
&self,
name: &str,
Expand Down Expand Up @@ -8220,6 +8227,46 @@ mod tests {
}
}

/// Each partition must read through the shared scheduler at a distinct base
/// priority. Tied priorities (every partition at 0) break the scheduler's
/// backpressure deadlock-break — which admits the lowest-priority in-flight
/// request — because there is no unique lowest request to advance, so a
/// concurrent multi-partition read (e.g. prewarm) can wedge. Distinct
/// per-partition priorities keep the in-flight set totally ordered.
#[tokio::test]
async fn test_partitions_load_with_distinct_priorities() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
ObjectStore::local().into(),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));
let (index, _cache) = build_multi_partition_index(&store, 5).await;

let mut priorities: Vec<u64> = index
.partitions
.iter()
.map(|part| {
part.store
.as_any()
.downcast_ref::<LanceIndexStore>()
.expect("partition store should be a LanceIndexStore")
.io_priority()
})
.collect();

// Distinct and dense (0..N): every partition reads at its own priority,
// so the shared scheduler sees a total order across all partitions. The
// partitions may finish loading in any order, so sort before comparing —
// what matters is that the priorities form a contiguous, collision-free
// set, not which partition ended up at which slot.
priorities.sort_unstable();
assert_eq!(
priorities,
(0..index.partitions.len() as u64).collect::<Vec<_>>()
);
}

#[tokio::test]
async fn test_update_preserves_loaded_v2_format_version() -> Result<()> {
let src_dir = TempObjDir::default();
Expand Down
22 changes: 21 additions & 1 deletion rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct LanceIndexStore {
/// When set, used to avoid HEAD calls when opening files
file_sizes: HashMap<String, u64>,
format_version: LanceFileVersion,
/// Base I/O priority for all requests this store submits to `scheduler`.
io_priority: u64,
}

impl DeepSizeOf for LanceIndexStore {
Expand Down Expand Up @@ -88,6 +90,7 @@ impl LanceIndexStore {
scheduler,
file_sizes: HashMap::new(),
format_version,
io_priority: 0,
}
}

Expand All @@ -100,6 +103,11 @@ impl LanceIndexStore {
self
}

/// The base I/O priority all this store's requests are submitted at.
pub fn io_priority(&self) -> u64 {
self.io_priority
}

fn index_file_path(&self, name: &str) -> Result<Path> {
let relative_path = Path::parse(name).map_err(|err| {
Error::invalid_input(format!("invalid index file path {name:?}: {err}"))
Expand Down Expand Up @@ -432,6 +440,15 @@ impl IndexStore for LanceIndexStore {
}))
}

fn with_io_priority(&self, io_priority: u64) -> Arc<dyn IndexStore> {
// The `scheduler` is shared (`Arc`), so this clone is cheap and the new
// priority only affects requests this clone submits.
Arc::new(Self {
io_priority,
..self.clone()
})
}

async fn open_index_file(&self, name: &str) -> Result<Arc<dyn IndexReader>> {
let path = self.index_file_path(name)?;
// Use cached file size if available, otherwise unknown (requires HEAD call)
Expand All @@ -440,7 +457,10 @@ impl IndexStore for LanceIndexStore {
.get(name)
.map(|&size| CachedFileSize::new(size))
.unwrap_or_else(CachedFileSize::unknown);
let file_scheduler = self.scheduler.open_file(&path, &cached_size).await?;
let file_scheduler = self
.scheduler
.open_file_with_priority(&path, self.io_priority, &cached_size)
.await?;
match current_reader::FileReader::try_open(
file_scheduler,
None,
Expand Down
Loading