Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
258 changes: 185 additions & 73 deletions rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1145,12 +1145,13 @@ impl Index for InvertedIndex {
}
}

/// Target on-disk size of one prewarm chunk; a partition is streamed in chunks of
/// ~this size so its peak resident set is one chunk, not the whole `invert.lance`.
const PREWARM_CHUNK_TARGET_BYTES: u64 = 32 << 20;
/// Target on-disk size of one prewarm chunk. Keep this large enough that cloud
/// stores do not spend prewarm time on thousands of tiny range reads, but still
/// bounded so one large partition is not materialized all at once.
const PREWARM_CHUNK_TARGET_BYTES: u64 = 128 << 20;

/// Cap on token rows per chunk, bounding the built `Vec` when posting lists are tiny.
const PREWARM_MAX_CHUNK_TOKENS: usize = 4096;
const PREWARM_MAX_CHUNK_TOKENS: usize = 256 * 1024;

/// Floor on token rows per chunk, so a partition always makes progress.
const PREWARM_MIN_CHUNK_TOKENS: usize = 1;
Expand All @@ -1174,46 +1175,71 @@ fn group_aligned_chunk_end(
tok_start: usize,
desired_end: usize,
) -> usize {
let fit = starts
.iter()
.map(|&s| s as usize)
.chain(std::iter::once(token_count))
.filter(|&b| b > tok_start && b <= desired_end)
.max();
if let Some(end) = fit {
return end;
if desired_end >= token_count {
return token_count;
}

let first_after_start = starts.partition_point(|&start| start as usize <= tok_start);
let first_after_desired = starts.partition_point(|&start| start as usize <= desired_end);
if first_after_desired > first_after_start {
return starts[first_after_desired - 1] as usize;
}

// Oversized group: extend to its end so it runs as one chunk.
starts
.iter()
.map(|&s| s as usize)
.find(|&b| b > tok_start)
.get(first_after_start)
.map(|&start| start as usize)
.unwrap_or(token_count)
}

fn prewarm_chunk_ranges(
group_starts: Option<&[u32]>,
token_count: usize,
chunk_tokens: usize,
) -> Vec<(usize, usize)> {
let mut ranges = Vec::new();
let mut tok_start = 0usize;
while tok_start < token_count {
let mut tok_end = (tok_start + chunk_tokens).min(token_count);
// `tok_start` is always a group boundary; snap `tok_end` back to one too.
if let Some(starts) = group_starts {
tok_end = group_aligned_chunk_end(starts, token_count, tok_start, tok_end);
}
ranges.push((tok_start, tok_end));
tok_start = tok_end;
}
ranges
}

fn group_start_indices_for_chunk(starts: &[u32], tok_start: usize, tok_end: usize) -> Range<usize> {
let first = starts.partition_point(|&start| (start as usize) < tok_start);
let end = starts.partition_point(|&start| (start as usize) < tok_end);
first..end
}

fn group_range_for_start_index(starts: &[u32], token_count: usize, group_idx: usize) -> (u32, u32) {
let start = starts[group_idx];
let end = starts
.get(group_idx + 1)
.copied()
.unwrap_or(token_count as u32);
(start, end)
}

impl InvertedIndex {
pub async fn prewarm_with_options(&self, options: &FtsPrewarmOptions) -> Result<()> {
let with_position = options.with_position;
let io_parallelism = self.store.io_parallelism();
let prewarm_futures = self
.partitions
.iter()
.map(Arc::clone)
.map(|part| async move {
part.inverted_list
.prewarm_posting_lists(with_position)
.await?;
// Materialize the deferred DocSet too: prewarm's contract is
// that subsequent queries do no IO, so the per-doc row_ids /
// num_tokens must be resident, not lazily faulted in at query
// time. `ensure_loaded` opens, reads, and drops the reader.
part.docs.ensure_loaded().await?;
Result::Ok(())
});
stream::iter(prewarm_futures)
.buffer_unordered(io_parallelism)
.try_collect::<Vec<_>>()
.await?;
let chunk_concurrency = self.store.io_parallelism().max(1);
for part in &self.partitions {
part.inverted_list
.prewarm_posting_lists(with_position, chunk_concurrency)
.await?;
// Materialize the deferred DocSet too: prewarm's contract is
// that subsequent queries do no IO, so the per-doc row_ids /
// num_tokens must be resident, not lazily faulted in at query
// time. `ensure_loaded` opens, reads, and drops the reader.
part.docs.ensure_loaded().await?;
}
Ok(())
}
/// Search docs match the input text.
Expand Down Expand Up @@ -2735,8 +2761,12 @@ impl PostingListReader {
Ok(batch)
}

async fn prewarm_posting_lists(&self, with_position: bool) -> Result<()> {
self.prewarm_posting_lists_chunked(with_position, None)
async fn prewarm_posting_lists(
&self,
with_position: bool,
chunk_concurrency: usize,
) -> Result<()> {
self.prewarm_posting_lists_chunked(with_position, None, chunk_concurrency)
.await?;
Ok(())
}
Expand All @@ -2748,6 +2778,7 @@ impl PostingListReader {
&self,
with_position: bool,
chunk_tokens_override: Option<usize>,
chunk_concurrency: usize,
) -> Result<usize> {
if with_position && !self.has_positions() {
return Err(Error::invalid_input(
Expand All @@ -2766,36 +2797,38 @@ impl PostingListReader {
// groups. Without grouping, chunks are plain token ranges.
let group_starts = self.group_starts.clone();
let token_count = self.len();
let posting_data_size_bytes = self.posting_data_size_bytes();
let chunk_tokens = chunk_tokens_override
.unwrap_or_else(|| prewarm_chunk_tokens(token_count, self.posting_data_size_bytes()))
.unwrap_or_else(|| prewarm_chunk_tokens(token_count, posting_data_size_bytes))
.max(1);
let chunk_ranges = prewarm_chunk_ranges(group_starts.as_deref(), token_count, chunk_tokens);
let chunk_count = chunk_ranges.len();
let chunk_concurrency = chunk_concurrency.max(1);

let mut chunk_count = 0usize;
let read_build_start = Instant::now();
let mut tok_start = 0usize;
while tok_start < token_count {
let mut tok_end = (tok_start + chunk_tokens).min(token_count);
// `tok_start` is always a group boundary; snap `tok_end` back to one too.
if let Some(starts) = group_starts.as_ref() {
tok_end = group_aligned_chunk_end(starts, token_count, tok_start, tok_end);
}
chunk_count += 1;

let posting_lists = self
.build_chunk_postings(tok_start, tok_end, with_position, &state)
.await?;
self.publish_chunk_postings(
posting_lists,
group_starts.as_deref(),
tok_start,
tok_end,
token_count,
with_position,
)
.await;

tok_start = tok_end;
}
stream::iter(chunk_ranges)
.map(|(tok_start, tok_end)| {
let state = &state;
let group_starts = group_starts.as_deref();
async move {
let posting_lists = self
.build_chunk_postings(tok_start, tok_end, with_position, state)
.await?;
self.publish_chunk_postings(
posting_lists,
group_starts,
tok_start,
tok_end,
token_count,
with_position,
)
.await;
Result::Ok(())
}
})
.buffer_unordered(chunk_concurrency)
.try_collect::<()>()
.await?;
let read_build_elapsed = read_build_start.elapsed();

info!(
Expand All @@ -2804,6 +2837,8 @@ impl PostingListReader {
token_count,
chunk_count,
chunk_tokens,
chunk_concurrency,
posting_data_size_bytes,
read_build_ms = read_build_elapsed.as_secs_f64() * 1000.0,
"posting list prewarm timing"
);
Expand Down Expand Up @@ -2922,12 +2957,9 @@ impl PostingListReader {
// in it; `chunk_postings[i]` is token `tok_start + i`. The last
// group's `end` derives from `token_count`, matching the read path
// so both produce identical `PostingListGroupKey`s.
for (k, &start) in starts.iter().enumerate() {
for group_idx in group_start_indices_for_chunk(starts, tok_start, tok_end) {
let (start, end) = group_range_for_start_index(starts, token_count, group_idx);
let start_usize = start as usize;
if start_usize < tok_start || start_usize >= tok_end {
continue;
}
let end = starts.get(k + 1).copied().unwrap_or(token_count as u32);
let lo = start_usize - tok_start;
let hi = end as usize - tok_start;
let group = PostingListGroup::new(chunk_postings[lo..hi].to_vec());
Expand Down Expand Up @@ -6450,7 +6482,7 @@ mod tests {
"test should use modern posting layout"
);

inverted_list.prewarm_posting_lists(false).await.unwrap();
inverted_list.prewarm_posting_lists(false, 2).await.unwrap();

// The two tiny tokens land in a single cache group [0, 2) (issue
// #7040); both postings are read out of that group entry.
Expand All @@ -6475,6 +6507,80 @@ mod tests {
);
}

#[test]
fn test_group_aligned_chunk_end_boundary_cases() {
let starts = [0, 3, 7, 10];
let token_count = 13;

assert_eq!(
group_aligned_chunk_end(&starts, token_count, 0, 5),
3,
"chunk should snap back to the largest group boundary that fits"
);
assert_eq!(
group_aligned_chunk_end(&starts, token_count, 3, 6),
7,
"oversized groups should run as one chunk"
);
assert_eq!(
group_aligned_chunk_end(&starts, token_count, 7, 10),
10,
"an exact next group boundary should be selected"
);
assert_eq!(
group_aligned_chunk_end(&starts, token_count, 10, 12),
13,
"the last group should extend to token_count"
);
assert_eq!(
group_aligned_chunk_end(&starts, token_count, 7, 13),
13,
"token_count should act as the final boundary"
);
}

#[test]
fn test_group_start_indices_for_chunk_boundary_cases() {
let starts = [0, 3, 7, 10];
let token_count = 13;
let ranges_for_chunk = |tok_start, tok_end| {
group_start_indices_for_chunk(&starts, tok_start, tok_end)
.map(|group_idx| group_range_for_start_index(&starts, token_count, group_idx))
.collect::<Vec<_>>()
};

assert_eq!(
ranges_for_chunk(0, 7),
vec![(0, 3), (3, 7)],
"publish should include only groups that start in the chunk"
);
assert_eq!(
ranges_for_chunk(7, 13),
vec![(7, 10), (10, 13)],
"publish should include the final group ending at token_count"
);
assert_eq!(
ranges_for_chunk(3, 10),
vec![(3, 7), (7, 10)],
"publish selection should work for an interior chunk"
);
}

#[test]
fn test_prewarm_chunk_ranges_preserve_group_boundaries() {
let starts = [0, 3, 7, 10];
assert_eq!(
prewarm_chunk_ranges(Some(&starts), 13, 5),
vec![(0, 3), (3, 7), (7, 10), (10, 13)],
"grouped chunk ranges must never split a posting cache group"
);
assert_eq!(
prewarm_chunk_ranges(None, 13, 5),
vec![(0, 5), (5, 10), (10, 13)],
"ungrouped chunk ranges should use plain token ranges"
);
}

/// Prewarming a large partition in multiple chunks must end up holding exactly the
/// same per-token posting lists (doc ids and frequencies) as the whole-file path.
/// Parametrized over layout: the legacy-v1 chunk path rebases global offsets to
Expand Down Expand Up @@ -6564,7 +6670,7 @@ mod tests {
// CHUNK_TOKENS < NUM_TOKENS each chunk is bounded below the whole partition.
const CHUNK_TOKENS: usize = 6;
let chunk_count = inverted_list
.prewarm_posting_lists_chunked(false, Some(CHUNK_TOKENS))
.prewarm_posting_lists_chunked(false, Some(CHUNK_TOKENS), 2)
.await
.unwrap();

Expand Down Expand Up @@ -6682,7 +6788,7 @@ mod tests {

const CHUNK_TOKENS: usize = 5;
let chunk_count = inverted_list
.prewarm_posting_lists_chunked(true, Some(CHUNK_TOKENS))
.prewarm_posting_lists_chunked(true, Some(CHUNK_TOKENS), 2)
.await
.unwrap();
assert!(
Expand Down Expand Up @@ -8699,7 +8805,10 @@ mod tests {
"fixture should span multiple groups",
);

posting_reader.prewarm_posting_lists(false).await.unwrap();
posting_reader
.prewarm_posting_lists(false, 2)
.await
.unwrap();

for token in 0..num_tokens {
let (start, end) = posting_reader.group_range_for_token(token).unwrap();
Expand Down Expand Up @@ -8844,7 +8953,10 @@ mod tests {
let posting_reader = PostingListReader::try_new(stripped, &cache).await.unwrap();
assert!(posting_reader.group_starts.is_none());

posting_reader.prewarm_posting_lists(false).await.unwrap();
posting_reader
.prewarm_posting_lists(false, 2)
.await
.unwrap();

for token_id in 0..num_tokens {
assert!(
Expand Down
Loading