diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index c23dc1c4e78..7648b336c86 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -1145,12 +1145,13 @@ impl Index for InvertedIndex { } } -/// Target on-disk size of one prewarm chunk; a partition is streamed in chunks of -/// ~this size so its peak resident set is one chunk, not the whole `invert.lance`. -const PREWARM_CHUNK_TARGET_BYTES: u64 = 32 << 20; +/// Target on-disk size of one prewarm chunk. Keep this large enough that cloud +/// stores do not spend prewarm time on thousands of tiny range reads, but still +/// bounded so one large partition is not materialized all at once. +const PREWARM_CHUNK_TARGET_BYTES: u64 = 128 << 20; /// Cap on token rows per chunk, bounding the built `Vec` when posting lists are tiny. -const PREWARM_MAX_CHUNK_TOKENS: usize = 4096; +const PREWARM_MAX_CHUNK_TOKENS: usize = 256 * 1024; /// Floor on token rows per chunk, so a partition always makes progress. const PREWARM_MIN_CHUNK_TOKENS: usize = 1; @@ -1174,46 +1175,71 @@ fn group_aligned_chunk_end( tok_start: usize, desired_end: usize, ) -> usize { - let fit = starts - .iter() - .map(|&s| s as usize) - .chain(std::iter::once(token_count)) - .filter(|&b| b > tok_start && b <= desired_end) - .max(); - if let Some(end) = fit { - return end; + if desired_end >= token_count { + return token_count; + } + + let first_after_start = starts.partition_point(|&start| start as usize <= tok_start); + let first_after_desired = starts.partition_point(|&start| start as usize <= desired_end); + if first_after_desired > first_after_start { + return starts[first_after_desired - 1] as usize; } + // Oversized group: extend to its end so it runs as one chunk. starts - .iter() - .map(|&s| s as usize) - .find(|&b| b > tok_start) + .get(first_after_start) + .map(|&start| start as usize) .unwrap_or(token_count) } +fn prewarm_chunk_ranges( + group_starts: Option<&[u32]>, + token_count: usize, + chunk_tokens: usize, +) -> Vec<(usize, usize)> { + let mut ranges = Vec::new(); + let mut tok_start = 0usize; + while tok_start < token_count { + let mut tok_end = (tok_start + chunk_tokens).min(token_count); + // `tok_start` is always a group boundary; snap `tok_end` back to one too. + if let Some(starts) = group_starts { + tok_end = group_aligned_chunk_end(starts, token_count, tok_start, tok_end); + } + ranges.push((tok_start, tok_end)); + tok_start = tok_end; + } + ranges +} + +fn group_start_indices_for_chunk(starts: &[u32], tok_start: usize, tok_end: usize) -> Range { + let first = starts.partition_point(|&start| (start as usize) < tok_start); + let end = starts.partition_point(|&start| (start as usize) < tok_end); + first..end +} + +fn group_range_for_start_index(starts: &[u32], token_count: usize, group_idx: usize) -> (u32, u32) { + let start = starts[group_idx]; + let end = starts + .get(group_idx + 1) + .copied() + .unwrap_or(token_count as u32); + (start, end) +} + impl InvertedIndex { pub async fn prewarm_with_options(&self, options: &FtsPrewarmOptions) -> Result<()> { let with_position = options.with_position; - let io_parallelism = self.store.io_parallelism(); - let prewarm_futures = self - .partitions - .iter() - .map(Arc::clone) - .map(|part| async move { - part.inverted_list - .prewarm_posting_lists(with_position) - .await?; - // Materialize the deferred DocSet too: prewarm's contract is - // that subsequent queries do no IO, so the per-doc row_ids / - // num_tokens must be resident, not lazily faulted in at query - // time. `ensure_loaded` opens, reads, and drops the reader. - part.docs.ensure_loaded().await?; - Result::Ok(()) - }); - stream::iter(prewarm_futures) - .buffer_unordered(io_parallelism) - .try_collect::>() - .await?; + let chunk_concurrency = self.store.io_parallelism().max(1); + for part in &self.partitions { + part.inverted_list + .prewarm_posting_lists(with_position, chunk_concurrency) + .await?; + // Materialize the deferred DocSet too: prewarm's contract is + // that subsequent queries do no IO, so the per-doc row_ids / + // num_tokens must be resident, not lazily faulted in at query + // time. `ensure_loaded` opens, reads, and drops the reader. + part.docs.ensure_loaded().await?; + } Ok(()) } /// Search docs match the input text. @@ -2735,8 +2761,12 @@ impl PostingListReader { Ok(batch) } - async fn prewarm_posting_lists(&self, with_position: bool) -> Result<()> { - self.prewarm_posting_lists_chunked(with_position, None) + async fn prewarm_posting_lists( + &self, + with_position: bool, + chunk_concurrency: usize, + ) -> Result<()> { + self.prewarm_posting_lists_chunked(with_position, None, chunk_concurrency) .await?; Ok(()) } @@ -2748,6 +2778,7 @@ impl PostingListReader { &self, with_position: bool, chunk_tokens_override: Option, + chunk_concurrency: usize, ) -> Result { if with_position && !self.has_positions() { return Err(Error::invalid_input( @@ -2766,36 +2797,38 @@ impl PostingListReader { // groups. Without grouping, chunks are plain token ranges. let group_starts = self.group_starts.clone(); let token_count = self.len(); + let posting_data_size_bytes = self.posting_data_size_bytes(); let chunk_tokens = chunk_tokens_override - .unwrap_or_else(|| prewarm_chunk_tokens(token_count, self.posting_data_size_bytes())) + .unwrap_or_else(|| prewarm_chunk_tokens(token_count, posting_data_size_bytes)) .max(1); + let chunk_ranges = prewarm_chunk_ranges(group_starts.as_deref(), token_count, chunk_tokens); + let chunk_count = chunk_ranges.len(); + let chunk_concurrency = chunk_concurrency.max(1); - let mut chunk_count = 0usize; let read_build_start = Instant::now(); - let mut tok_start = 0usize; - while tok_start < token_count { - let mut tok_end = (tok_start + chunk_tokens).min(token_count); - // `tok_start` is always a group boundary; snap `tok_end` back to one too. - if let Some(starts) = group_starts.as_ref() { - tok_end = group_aligned_chunk_end(starts, token_count, tok_start, tok_end); - } - chunk_count += 1; - - let posting_lists = self - .build_chunk_postings(tok_start, tok_end, with_position, &state) - .await?; - self.publish_chunk_postings( - posting_lists, - group_starts.as_deref(), - tok_start, - tok_end, - token_count, - with_position, - ) - .await; - - tok_start = tok_end; - } + stream::iter(chunk_ranges) + .map(|(tok_start, tok_end)| { + let state = &state; + let group_starts = group_starts.as_deref(); + async move { + let posting_lists = self + .build_chunk_postings(tok_start, tok_end, with_position, state) + .await?; + self.publish_chunk_postings( + posting_lists, + group_starts, + tok_start, + tok_end, + token_count, + with_position, + ) + .await; + Result::Ok(()) + } + }) + .buffer_unordered(chunk_concurrency) + .try_collect::<()>() + .await?; let read_build_elapsed = read_build_start.elapsed(); info!( @@ -2804,6 +2837,8 @@ impl PostingListReader { token_count, chunk_count, chunk_tokens, + chunk_concurrency, + posting_data_size_bytes, read_build_ms = read_build_elapsed.as_secs_f64() * 1000.0, "posting list prewarm timing" ); @@ -2922,12 +2957,9 @@ impl PostingListReader { // in it; `chunk_postings[i]` is token `tok_start + i`. The last // group's `end` derives from `token_count`, matching the read path // so both produce identical `PostingListGroupKey`s. - for (k, &start) in starts.iter().enumerate() { + for group_idx in group_start_indices_for_chunk(starts, tok_start, tok_end) { + let (start, end) = group_range_for_start_index(starts, token_count, group_idx); let start_usize = start as usize; - if start_usize < tok_start || start_usize >= tok_end { - continue; - } - let end = starts.get(k + 1).copied().unwrap_or(token_count as u32); let lo = start_usize - tok_start; let hi = end as usize - tok_start; let group = PostingListGroup::new(chunk_postings[lo..hi].to_vec()); @@ -6450,7 +6482,7 @@ mod tests { "test should use modern posting layout" ); - inverted_list.prewarm_posting_lists(false).await.unwrap(); + inverted_list.prewarm_posting_lists(false, 2).await.unwrap(); // The two tiny tokens land in a single cache group [0, 2) (issue // #7040); both postings are read out of that group entry. @@ -6475,6 +6507,80 @@ mod tests { ); } + #[test] + fn test_group_aligned_chunk_end_boundary_cases() { + let starts = [0, 3, 7, 10]; + let token_count = 13; + + assert_eq!( + group_aligned_chunk_end(&starts, token_count, 0, 5), + 3, + "chunk should snap back to the largest group boundary that fits" + ); + assert_eq!( + group_aligned_chunk_end(&starts, token_count, 3, 6), + 7, + "oversized groups should run as one chunk" + ); + assert_eq!( + group_aligned_chunk_end(&starts, token_count, 7, 10), + 10, + "an exact next group boundary should be selected" + ); + assert_eq!( + group_aligned_chunk_end(&starts, token_count, 10, 12), + 13, + "the last group should extend to token_count" + ); + assert_eq!( + group_aligned_chunk_end(&starts, token_count, 7, 13), + 13, + "token_count should act as the final boundary" + ); + } + + #[test] + fn test_group_start_indices_for_chunk_boundary_cases() { + let starts = [0, 3, 7, 10]; + let token_count = 13; + let ranges_for_chunk = |tok_start, tok_end| { + group_start_indices_for_chunk(&starts, tok_start, tok_end) + .map(|group_idx| group_range_for_start_index(&starts, token_count, group_idx)) + .collect::>() + }; + + assert_eq!( + ranges_for_chunk(0, 7), + vec![(0, 3), (3, 7)], + "publish should include only groups that start in the chunk" + ); + assert_eq!( + ranges_for_chunk(7, 13), + vec![(7, 10), (10, 13)], + "publish should include the final group ending at token_count" + ); + assert_eq!( + ranges_for_chunk(3, 10), + vec![(3, 7), (7, 10)], + "publish selection should work for an interior chunk" + ); + } + + #[test] + fn test_prewarm_chunk_ranges_preserve_group_boundaries() { + let starts = [0, 3, 7, 10]; + assert_eq!( + prewarm_chunk_ranges(Some(&starts), 13, 5), + vec![(0, 3), (3, 7), (7, 10), (10, 13)], + "grouped chunk ranges must never split a posting cache group" + ); + assert_eq!( + prewarm_chunk_ranges(None, 13, 5), + vec![(0, 5), (5, 10), (10, 13)], + "ungrouped chunk ranges should use plain token ranges" + ); + } + /// Prewarming a large partition in multiple chunks must end up holding exactly the /// same per-token posting lists (doc ids and frequencies) as the whole-file path. /// Parametrized over layout: the legacy-v1 chunk path rebases global offsets to @@ -6564,7 +6670,7 @@ mod tests { // CHUNK_TOKENS < NUM_TOKENS each chunk is bounded below the whole partition. const CHUNK_TOKENS: usize = 6; let chunk_count = inverted_list - .prewarm_posting_lists_chunked(false, Some(CHUNK_TOKENS)) + .prewarm_posting_lists_chunked(false, Some(CHUNK_TOKENS), 2) .await .unwrap(); @@ -6682,7 +6788,7 @@ mod tests { const CHUNK_TOKENS: usize = 5; let chunk_count = inverted_list - .prewarm_posting_lists_chunked(true, Some(CHUNK_TOKENS)) + .prewarm_posting_lists_chunked(true, Some(CHUNK_TOKENS), 2) .await .unwrap(); assert!( @@ -8699,7 +8805,10 @@ mod tests { "fixture should span multiple groups", ); - posting_reader.prewarm_posting_lists(false).await.unwrap(); + posting_reader + .prewarm_posting_lists(false, 2) + .await + .unwrap(); for token in 0..num_tokens { let (start, end) = posting_reader.group_range_for_token(token).unwrap(); @@ -8844,7 +8953,10 @@ mod tests { let posting_reader = PostingListReader::try_new(stripped, &cache).await.unwrap(); assert!(posting_reader.group_starts.is_none()); - posting_reader.prewarm_posting_lists(false).await.unwrap(); + posting_reader + .prewarm_posting_lists(false, 2) + .await + .unwrap(); for token_id in 0..num_tokens { assert!(