diff --git a/rust/lance-index/src/scalar/ngram.rs b/rust/lance-index/src/scalar/ngram.rs index b452ef78c85..c43c0f441b3 100644 --- a/rust/lance-index/src/scalar/ngram.rs +++ b/rust/lance-index/src/scalar/ngram.rs @@ -6,7 +6,7 @@ use std::collections::BTreeMap; use std::iter::once; use std::time::Instant; use std::{ - collections::{HashMap, HashSet}, + collections::{HashMap, HashSet, VecDeque}, sync::Arc, }; @@ -20,8 +20,7 @@ use crate::metrics::NoOpMetricsCollector; use crate::pbold; use crate::scalar::expression::{ScalarQueryParser, TextQueryParser}; use crate::scalar::registry::{ - DefaultTrainingRequest, ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, - VALUE_COLUMN_NAME, + ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME, }; use crate::scalar::{CreatedIndex, UpdateCriteria}; use crate::vector::VectorIndex; @@ -49,7 +48,7 @@ use lance_tokenizer::{ }; use log::info; use roaring::{RoaringBitmap, RoaringTreemap}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tracing::instrument; mod ngram_regex; @@ -58,7 +57,22 @@ pub(crate) use ngram_regex::regex_can_use_index; const TOKENS_COL: &str = "tokens"; const POSTING_LIST_COL: &str = "posting_list"; const POSTINGS_FILENAME: &str = "ngram_postings.lance"; -const NGRAM_INDEX_VERSION: u32 = 0; +const NGRAM_TRIGRAM_INDEX_VERSION: u32 = 0; +const NGRAM_SPARSE_INDEX_VERSION: u32 = 1; + +// The Index format version is checked before loading. The postings metadata is used +// after loading to pick the query tokenization without changing protobuf index +// details. +const NGRAM_TOKENIZATION_METADATA_KEY: &str = "lance.ngram.tokenization"; + +// FNV-1a constants used to map variable-length sparse n-grams into the existing +// u32 token column. +const SPARSE_HASH_OFFSET: u32 = 0x811c9dc5; +const SPARSE_HASH_PRIME: u32 = 0x01000193; +const SPARSE_MIN_NGRAM_LEN: usize = 3; +// Bound sparse token generation so one very long URL/base64/id run cannot +// materialize every substring during index build or query planning. +const SPARSE_MAX_NGRAM_LEN: usize = 128; use std::sync::LazyLock; @@ -110,6 +124,47 @@ const MAX_TOKEN: usize = ALPHA_SPAN.pow(2) + ALPHA_SPAN; const MIN_TOKEN: usize = 0; const NGRAM_N: usize = 3; +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum NGramTokenization { + #[default] + Trigram, + Sparse, +} + +impl NGramTokenization { + fn as_metadata_value(self) -> &'static str { + match self { + Self::Trigram => "trigram", + Self::Sparse => "sparse", + } + } + + fn from_metadata_value(value: Option<&String>) -> Result { + match value.map(String::as_str) { + None | Some("trigram") => Ok(Self::Trigram), + Some("sparse") => Ok(Self::Sparse), + Some(other) => Err(Error::invalid_input_source( + format!("Unknown ngram tokenization mode: {other}").into(), + )), + } + } + + fn index_version(self) -> u32 { + match self { + Self::Trigram => NGRAM_TRIGRAM_INDEX_VERSION, + Self::Sparse => NGRAM_SPARSE_INDEX_VERSION, + } + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct NGramIndexParams { + #[serde(default)] + pub tokenization: NGramTokenization, +} + // Convert an ngram (string) to a token (u32). This helps avoid heap allocations // and it makes it easier to partition the tokens for shuffling // @@ -148,10 +203,244 @@ fn ngram_to_token(ngram: &str, ngram_length: usize) -> u32 { token } +// Stable FNV-1a hash for persisted sparse token ids. Collisions only add false +// positives because NGram search results are rechecked by the scan. +fn stable_sparse_hash(bytes: impl IntoIterator) -> u32 { + let mut hash = SPARSE_HASH_OFFSET; + for byte in bytes { + hash ^= byte as u32; + hash = hash.wrapping_mul(SPARSE_HASH_PRIME); + } + // Token 0 is reserved for NULL rows. Hash collisions are acceptable because + // this index always returns candidates that are rechecked by the scan. + if hash == 0 { 1 } else { hash } +} + +// Prefix sparse n-gram bytes before hashing so future token families can share +// the u32 postings column without reusing the same hash namespace. +fn sparse_ngram_to_token(ngram: &[u8]) -> u32 { + stable_sparse_hash( + b"lance_sparse_ngram_v1" + .iter() + .copied() + .chain(ngram.iter().copied()), + ) +} + +fn sparse_pair_weight(left: u8, right: u8) -> u32 { + stable_sparse_hash([left, right]) +} + +// Apply the same text normalization as the trigram tokenizer, then expose only +// ASCII-alphanumeric runs so byte offsets remain valid sparse n-gram boundaries. +fn normalized_alnum_runs(text: &str, mut visitor: impl FnMut(&str)) { + let mut prepper = TEXT_PREPPER.clone(); + let mut raw_stream = prepper.token_stream(text); + while raw_stream.advance() { + let mut run = String::new(); + for ch in raw_stream.token().text.chars() { + if ch.is_ascii_alphanumeric() { + run.push(ch); + } else if !run.is_empty() { + visitor(&run); + run.clear(); + } + } + if !run.is_empty() { + visitor(&run); + } + } +} + +// A sparse token plus the byte range it covers within a normalized run. The +// range is used by query covering; the index build path only needs `token`. +#[derive(Debug, Clone, Copy)] +struct SparseTokenSpan { + token: u32, + #[cfg(test)] + start: usize, + #[cfg(test)] + end: usize, +} + +#[derive(Debug, Clone, Copy)] +struct SparsePair { + // A sparse n-gram boundary is represented by the hash weight of an + // adjacent byte pair. `pos` is the byte offset of the pair's first byte. + weight: u32, + pos: usize, +} + +fn sparse_span(bytes: &[u8], start: usize, end: usize) -> SparseTokenSpan { + SparseTokenSpan { + token: sparse_ngram_to_token(&bytes[start..end]), + #[cfg(test)] + start, + #[cfg(test)] + end, + } +} + +fn push_sparse_span(spans: &mut Vec, bytes: &[u8], start: usize, end: usize) { + if end - start <= SPARSE_MAX_NGRAM_LEN { + spans.push(sparse_span(bytes, start, end)); + } +} + +// Generate the sparse n-grams stored in the index with the standard monotonic +// stack construction. A bounded span is emitted when its boundary pair weights +// dominate the internal pair weights. +fn sparse_spans_for_run(run: &str) -> Vec { + let bytes = run.as_bytes(); + if bytes.len() < SPARSE_MIN_NGRAM_LEN { + return Vec::new(); + } + + let mut spans = Vec::new(); + // Monotonic stack of candidate boundary pairs, ordered by non-increasing + // pair weight from bottom to top. + let mut stack: Vec = Vec::new(); + for pair_pos in 0..bytes.len() - 1 { + let pair = SparsePair { + weight: sparse_pair_weight(bytes[pair_pos], bytes[pair_pos + 1]), + pos: pair_pos, + }; + while stack.last().is_some_and(|last| pair.weight > last.weight) { + let start = stack.last().unwrap().pos; + // The current pair becomes the right boundary for the span whose + // left boundary is the pending tail pair. + push_sparse_span(&mut spans, bytes, start, pair_pos + 2); + // Equal-weight suffix pairs form a plateau. Sparse n-gram + // boundaries require strict domination over internal pairs, so the + // plateau can be collapsed before removing the dominated boundary. + while stack.len() > 1 && stack.last().unwrap().weight == stack[stack.len() - 2].weight { + stack.pop(); + } + stack.pop(); + } + if let Some(last) = stack.last() { + // The stack tail and the current pair form a valid sparse span. For + // adjacent pairs this emits the minimum-length token with no + // internal pair. + push_sparse_span(&mut spans, bytes, last.pos, pair_pos + 2); + } + stack.push(pair); + } + spans +} + +// Index construction stores every sparse token for each normalized run. Query +// planning can then choose a smaller covering subset without missing matches. +fn sparse_index_token_visitor(text: &str, mut visitor: impl FnMut(u32)) { + normalized_alnum_runs(text, |run| { + for span in sparse_spans_for_run(run) { + visitor(span.token); + } + }); +} + +// Query planning uses the covering sparse n-gram stack construction so it can +// read fewer, longer posting lists than the fixed-trigram path. +fn sparse_covering_tokens(text: &str) -> Vec { + let mut tokens = Vec::new(); + normalized_alnum_runs(text, |run| { + let bytes = run.as_bytes(); + if bytes.len() < SPARSE_MIN_NGRAM_LEN { + return; + } + + // Keep boundary candidates in a deque ordered by byte position. The + // front is the oldest boundary still available for the current + // covering window, and the back is the newest boundary being compared + // against the next adjacent pair. It is also monotonic by weight from + // front to back, with non-increasing pair weights. + let mut stack: VecDeque = VecDeque::new(); + for pair_pos in 0..bytes.len() - 1 { + let pair = SparsePair { + weight: sparse_pair_weight(bytes[pair_pos], bytes[pair_pos + 1]), + pos: pair_pos, + }; + + if stack.len() > 1 { + let covering_len = pair_pos + 2 - stack.front().unwrap().pos; + if covering_len > SPARSE_MAX_NGRAM_LEN { + // The first two entries are adjacent surviving boundaries. + // The build path emits the same bounded span when the + // second boundary is pushed, so this token is safe to + // require during query filtering. + let start = stack.front().unwrap().pos; + let end = stack[1].pos + 2; + tokens.push(sparse_ngram_to_token(&bytes[start..end])); + stack.pop_front(); + } + } + + while stack.back().is_some_and(|last| pair.weight > last.weight) { + // Query covering intentionally emits fewer tokens than index + // build. The index stores all valid sparse n-grams, but a + // query only needs a covering subset. Emit here only when the + // stack must be closed, such as for an equal-weight suffix. + if stack.front().unwrap().weight == stack.back().unwrap().weight { + let start = stack.back().unwrap().pos; + tokens.push(sparse_ngram_to_token(&bytes[start..pair_pos + 2])); + while stack.len() > 1 { + let end = stack.back().unwrap().pos + 2; + stack.pop_back(); + let start = stack.back().unwrap().pos; + tokens.push(sparse_ngram_to_token(&bytes[start..end])); + } + } + stack.pop_back(); + } + + stack.push_back(pair); + } + + // The run ended before these pending boundaries were closed by a + // higher-weight pair. Emit adjacent spans to cover the remaining + // monotonic suffix; adjacent spans have no internal pair, so they are + // valid sparse n-grams. + while stack.len() > 1 { + let end = stack.back().unwrap().pos + 2; + stack.pop_back(); + let start = stack.back().unwrap().pos; + tokens.push(sparse_ngram_to_token(&bytes[start..end])); + } + }); + tokens.sort_unstable(); + tokens.dedup(); + tokens +} + +// Convert a query literal into the token ids required by the selected +// tokenization. Regex analysis uses this for literal fragments as well. +fn query_tokens_for_text(tokenization: NGramTokenization, text: &str) -> Vec { + match tokenization { + NGramTokenization::Trigram => { + let mut tokens = Vec::new(); + tokenize_visitor(&NGRAM_TOKENIZER, text, |ngram| { + tokens.push(ngram_to_token(ngram, NGRAM_N)); + }); + tokens + } + NGramTokenization::Sparse => sparse_covering_tokens(text), + } +} + +// Persist the tokenization mode in the postings file so an index can recover +// query tokenization after it is loaded. +fn ngram_metadata(tokenization: NGramTokenization) -> HashMap { + HashMap::from([( + NGRAM_TOKENIZATION_METADATA_KEY.to_string(), + tokenization.as_metadata_value().to_string(), + )]) +} + /// Basic stats about an ngram index #[derive(Serialize)] struct NGramStatistics { num_ngrams: usize, + tokenization: NGramTokenization, } /// The row ids that contain a given ngram @@ -269,13 +558,10 @@ impl NGramPostingListReader { pub struct NGramIndex { /// The mapping from tokens to row offsets tokens: HashMap, + /// Tokenization mode used to build this index and to derive query tokens. + tokenization: NGramTokenization, /// The reader for the posting lists list_reader: Arc, - /// The tokenizer used to tokenize text. Note: not all tokenizers can be used with this index. For - /// example, a stemming tokenizer would not work well because "dozing" would stem to "doze" and if the - /// search term is "zing" it would not match. As a result, this tokenizer is not as configurable as the - /// tokenizers used in an inverted index. - tokenizer: TextAnalyzer, io_parallelism: usize, /// The store that owns the index store: Arc, @@ -302,9 +588,15 @@ impl NGramIndex { frag_reuse_index: Option>, index_cache: &LanceCache, ) -> Result { - let tokens = store.open_index_file(POSTINGS_FILENAME).await?; - let tokens = tokens - .read_range(0..tokens.num_rows(), Some(&[TOKENS_COL])) + let tokens_reader = store.open_index_file(POSTINGS_FILENAME).await?; + let tokenization = NGramTokenization::from_metadata_value( + tokens_reader + .schema() + .metadata + .get(NGRAM_TOKENIZATION_METADATA_KEY), + )?; + let tokens = tokens_reader + .read_range(0..tokens_reader.num_rows(), Some(&[TOKENS_COL])) .await?; let tokens_map = HashMap::from_iter( @@ -317,7 +609,6 @@ impl NGramIndex { .enumerate() .map(|(idx, token)| (token, idx as u32)), ); - let posting_reader = Arc::new(NGramPostingListReader { reader: store.open_index_file(POSTINGS_FILENAME).await?, frag_reuse_index, @@ -327,8 +618,8 @@ impl NGramIndex { Ok(Self { io_parallelism: store.io_parallelism(), tokens: tokens_map, + tokenization, list_reader: posting_reader, - tokenizer: NGRAM_TOKENIZER.clone(), store, }) } @@ -385,6 +676,10 @@ impl NGramIndex { Self::from_store(store, frag_reuse_index, index_cache).await?, )) } + + fn query_tokens_for_text(&self, text: &str) -> Vec { + query_tokens_for_text(self.tokenization, text) + } } #[async_trait] @@ -406,6 +701,7 @@ impl Index for NGramIndex { fn statistics(&self) -> Result { let ngram_stats = NGramStatistics { num_ngrams: self.tokens.len(), + tokenization: self.tokenization, }; serde_json::to_value(ngram_stats) .map_err(|e| Error::internal(format!("Error serializing statistics: {}", e))) @@ -457,14 +753,17 @@ impl ScalarIndex for NGramIndex { let mut row_offsets = Vec::with_capacity(substr.len() * 3); let mut missing = false; - tokenize_visitor(&self.tokenizer, substr, |ngram| { - let token = ngram_to_token(ngram, NGRAM_N); + let query_tokens = self.query_tokens_for_text(substr); + if query_tokens.is_empty() { + return Ok(SearchResult::at_least(RowAddrTreeMap::new())); + } + for token in query_tokens { if let Some(row_offset) = self.tokens.get(&token) { row_offsets.push(*row_offset); } else { missing = true; } - }); + } // At least one token was missing, so we know there are zero results if missing { return Ok(SearchResult::exact(RowAddrTreeMap::new())); @@ -483,23 +782,21 @@ impl ScalarIndex for NGramIndex { Ok(SearchResult::at_most(RowAddrTreeMap::from(row_ids))) } TextQuery::Regex(pattern) => { - let trigram_query = ngram_regex::regex_to_trigram_query(pattern); - match &trigram_query { - // No usable trigram structure (e.g. `a.b`, `.*`): the index + let ngram_query = ngram_regex::regex_to_ngram_query(pattern, self.tokenization); + match &ngram_query { + // No usable ngram structure (e.g. `a.b`, `.*`): the index // cannot prune, so every row must be rechecked. - ngram_regex::TrigramQuery::All => { + ngram_regex::NGramQuery::All => { Ok(SearchResult::at_least(RowAddrTreeMap::new())) } // The pattern is provably unsatisfiable. - ngram_regex::TrigramQuery::None => { - Ok(SearchResult::exact(RowAddrTreeMap::new())) - } + ngram_regex::NGramQuery::None => Ok(SearchResult::exact(RowAddrTreeMap::new())), _ => { let mut tokens = HashSet::new(); - ngram_regex::collect_tokens(&trigram_query, &mut tokens); - // Fetch the posting list for every trigram the condition + ngram_regex::collect_tokens(&ngram_query, &mut tokens); + // Fetch the posting list for every token the condition // references; a token absent from the index contributes - // an empty list, which `eval_trigram_query` handles. + // an empty list, which `eval_ngram_query` handles. let present = tokens.into_iter().filter_map(|token| { self.tokens.get(&token).map(|offset| (token, *offset)) }); @@ -516,7 +813,7 @@ impl ScalarIndex for NGramIndex { .into_iter() .map(|(token, list)| (token, list.bitmap.clone())) .collect(); - let row_ids = ngram_regex::eval_trigram_query(&trigram_query, &bitmaps); + let row_ids = ngram_regex::eval_ngram_query(&ngram_query, &bitmaps); Ok(SearchResult::at_most(RowAddrTreeMap::from(row_ids))) } } @@ -549,12 +846,14 @@ impl ScalarIndex for NGramIndex { offset += BATCH_SIZE; } - let file = writer.finish().await?; + let file = writer + .finish_with_metadata(ngram_metadata(self.tokenization)) + .await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::NGramIndexDetails::default()) .unwrap(), - index_version: NGRAM_INDEX_VERSION, + index_version: self.tokenization.index_version(), files: vec![file], }) } @@ -565,7 +864,10 @@ impl ScalarIndex for NGramIndex { dest_store: &dyn IndexStore, _old_data_filter: Option, ) -> Result { - let mut builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions::default())?; + let mut builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { + tokenization: self.tokenization, + ..NGramIndexBuilderOptions::default() + })?; let spill_files = builder.train(new_data).await?; let file = builder @@ -575,7 +877,7 @@ impl ScalarIndex for NGramIndex { Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::NGramIndexDetails::default()) .unwrap(), - index_version: NGRAM_INDEX_VERSION, + index_version: self.tokenization.index_version(), files: vec![file], }) } @@ -585,13 +887,21 @@ impl ScalarIndex for NGramIndex { } fn derive_index_params(&self) -> Result { - Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::NGram)) + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::NGram); + if self.tokenization == NGramTokenization::Trigram { + Ok(params) + } else { + Ok(params.with_params(&NGramIndexParams { + tokenization: self.tokenization, + })) + } } } #[derive(Debug, Clone)] pub struct NGramIndexBuilderOptions { tokens_per_spill: usize, + tokenization: NGramTokenization, } // A higher value will use more RAM. A lower value will have to do more spilling @@ -625,6 +935,7 @@ impl Default for NGramIndexBuilderOptions { fn default() -> Self { Self { tokens_per_spill: *DEFAULT_TOKENS_PER_SPILL, + tokenization: NGramTokenization::Trigram, } } } @@ -878,6 +1189,7 @@ impl NGramIndexBuilder { fn tokenize_and_partition( tokenizer: &TextAnalyzer, + tokenization: NGramTokenization, batch: RecordBatch, num_workers: usize, ) -> Result>> { @@ -891,11 +1203,17 @@ impl NGramIndexBuilder { let divisor = (MAX_TOKEN - MIN_TOKEN) / num_workers; for (text, row_id) in text_iter.zip(row_id_col.values()) { if let Some(text) = text { - tokenize_visitor(tokenizer, text, |token| { - let token = ngram_to_token(token, NGRAM_N); - let partition_id = (token as usize).saturating_sub(MIN_TOKEN) / divisor; - partitions[partition_id % num_workers].push((token, *row_id)); - }); + match tokenization { + NGramTokenization::Trigram => tokenize_visitor(tokenizer, text, |token| { + let token = ngram_to_token(token, NGRAM_N); + let partition_id = (token as usize).saturating_sub(MIN_TOKEN) / divisor; + partitions[partition_id % num_workers].push((token, *row_id)); + }), + NGramTokenization::Sparse => sparse_index_token_visitor(text, |token| { + let partition_id = token as usize % num_workers; + partitions[partition_id].push((token, *row_id)); + }), + } } else { partitions[0].push((0, *row_id)); } @@ -903,6 +1221,10 @@ impl NGramIndexBuilder { Ok(partitions) } + fn write_metadata(&self) -> HashMap { + ngram_metadata(self.options.tokenization) + } + pub async fn train(&mut self, data: SendableRecordBatchStream) -> Result> { let schema = data.schema(); Self::validate_schema(schema.as_ref())?; @@ -927,9 +1249,11 @@ impl NGramIndexBuilder { let mut partitions_stream = data .and_then(|batch| { let tokenizer = self.tokenizer.clone(); + let tokenization = self.options.tokenization; std::future::ready(Ok(tokio::task::spawn(async move { Ok(Self::tokenize_and_partition( &tokenizer, + tokenization, batch, num_workers, )?) @@ -1240,7 +1564,7 @@ impl NGramIndexBuilder { let mut writer = store .new_index_file(POSTINGS_FILENAME, POSTINGS_SCHEMA.clone()) .await?; - return writer.finish().await; + return writer.finish_with_metadata(self.write_metadata()).await; } } @@ -1265,13 +1589,38 @@ impl NGramIndexBuilder { offset += batch_size; } - writer.finish().await + writer.finish_with_metadata(self.write_metadata()).await } } #[derive(Debug, Default)] pub struct NGramIndexPlugin; +// Carries parsed NGram index parameters from request parsing into training. +struct NGramTrainingRequest { + criteria: TrainingCriteria, + params: NGramIndexParams, +} + +impl NGramTrainingRequest { + fn new(params: NGramIndexParams) -> Self { + Self { + criteria: TrainingCriteria::new(TrainingOrdering::None).with_row_id(), + params, + } + } +} + +impl TrainingRequest for NGramTrainingRequest { + fn as_any(&self) -> &dyn Any { + self + } + + fn criteria(&self) -> &TrainingCriteria { + &self.criteria + } +} + impl NGramIndexPlugin { pub async fn train_ngram_index( batches_source: SendableRecordBatchStream, @@ -1293,7 +1642,7 @@ impl ScalarIndexPlugin for NGramIndexPlugin { fn new_training_request( &self, - _params: &str, + params: &str, field: &Field, ) -> Result> { if !matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) { @@ -1303,9 +1652,14 @@ impl ScalarIndexPlugin for NGramIndexPlugin { ) .into())); } - Ok(Box::new(DefaultTrainingRequest::new( - TrainingCriteria::new(TrainingOrdering::None).with_row_id(), - ))) + let parsed_params = if params.trim().is_empty() { + NGramIndexParams::default() + } else { + serde_json::from_str(params).map_err(|e| { + Error::invalid_input_source(format!("Invalid NGram index params: {e}").into()) + })? + }; + Ok(Box::new(NGramTrainingRequest::new(parsed_params))) } fn provides_exact_answer(&self) -> bool { @@ -1313,7 +1667,7 @@ impl ScalarIndexPlugin for NGramIndexPlugin { } fn version(&self) -> u32 { - NGRAM_INDEX_VERSION + NGRAM_SPARSE_INDEX_VERSION } fn new_query_parser( @@ -1335,7 +1689,7 @@ impl ScalarIndexPlugin for NGramIndexPlugin { &self, data: SendableRecordBatchStream, index_store: &dyn IndexStore, - _request: Box, + request: Box, fragment_ids: Option>, _progress: Arc, ) -> Result { @@ -1345,11 +1699,21 @@ impl ScalarIndexPlugin for NGramIndexPlugin { )); } - let file = Self::train_ngram_index(data, index_store).await?; + let request = request + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::internal("Invalid NGram training request type"))?; + let mut builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { + tokenization: request.params.tokenization, + ..NGramIndexBuilderOptions::default() + })?; + + let spill_files = builder.train(data).await?; + let file = builder.write_index(index_store, spill_files, None).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::NGramIndexDetails::default()) .unwrap(), - index_version: NGRAM_INDEX_VERSION, + index_version: request.params.tokenization.index_version(), files: vec![file], }) } @@ -1373,7 +1737,7 @@ mod tests { }; use arrow::datatypes::UInt64Type; - use arrow_array::{Array, RecordBatch, StringArray, UInt64Array}; + use arrow_array::{Array, BinaryArray, RecordBatch, StringArray, UInt32Array, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion::{ execution::SendableRecordBatchStream, physical_plan::stream::RecordBatchStreamAdapter, @@ -1387,14 +1751,24 @@ mod tests { use lance_select::RowAddrTreeMap; use lance_tokenizer::TextAnalyzer; + use crate::progress::NoopIndexBuildProgress; use crate::scalar::{ - ScalarIndex, SearchResult, TextQuery, + IndexStore, ScalarIndex, SearchResult, TextQuery, lance_format::LanceIndexStore, - ngram::{NGramIndex, NGramIndexBuilder, NGramIndexBuilderOptions}, + ngram::{NGramIndex, NGramIndexBuilder, NGramIndexBuilderOptions, NGramIndexPlugin}, + }; + use crate::{ + metrics::NoOpMetricsCollector, + scalar::registry::{ScalarIndexPlugin, VALUE_COLUMN_NAME}, }; - use crate::{metrics::NoOpMetricsCollector, scalar::registry::VALUE_COLUMN_NAME}; - use super::{NGRAM_TOKENIZER, ngram_to_token, tokenize_visitor}; + use super::{ + NGRAM_N, NGRAM_SPARSE_INDEX_VERSION, NGRAM_TOKENIZER, NGRAM_TRIGRAM_INDEX_VERSION, + NGramTokenization, POSTING_LIST_COL, POSTINGS_FILENAME, SPARSE_MAX_NGRAM_LEN, + SPARSE_MIN_NGRAM_LEN, TOKENS_COL, ngram_to_token, query_tokens_for_text, + sparse_index_token_visitor, sparse_ngram_to_token, sparse_spans_for_run, tokenize_visitor, + }; + use roaring::RoaringTreemap; fn collect_tokens(analyzer: &TextAnalyzer, text: &str) -> Vec { let mut tokens = Vec::with_capacity(text.len() * 3); @@ -1440,6 +1814,78 @@ mod tests { ); } + #[test] + fn test_sparse_span_generation_is_bounded() { + let run = "abcdefghijklmnopqrstuvwxyz0123456789".repeat(20); + let spans = sparse_spans_for_run(&run); + + assert!(!spans.is_empty()); + assert!(spans.iter().all(|span| { + let len = span.end - span.start; + (SPARSE_MIN_NGRAM_LEN..=SPARSE_MAX_NGRAM_LEN).contains(&len) + })); + assert!(spans.len() <= 2 * run.len().saturating_sub(1)); + } + + #[test] + fn test_sparse_covering_tokens_for_long_query_is_bounded() { + let literal = "abcdefghijklmnopqrstuvwxyz0123456789".repeat(256); + let spans = sparse_spans_for_run(&literal); + let sparse_tokens = query_tokens_for_text(NGramTokenization::Sparse, &literal); + let mut index_tokens = HashSet::new(); + sparse_index_token_visitor(&literal, |token| { + index_tokens.insert(token); + }); + + assert!(!sparse_tokens.is_empty()); + assert!(sparse_tokens.len() <= spans.len()); + assert!(sparse_tokens.len() <= literal.len() / SPARSE_MIN_NGRAM_LEN + 1); + assert!( + sparse_tokens + .iter() + .all(|token| index_tokens.contains(token)) + ); + } + + #[test] + fn test_sparse_covering_tokens_matches_stack_builder() { + let literal = "sparsemarkerabcdefghijklmnopqrstuvwx"; + let sparse_tokens = query_tokens_for_text(NGramTokenization::Sparse, literal); + let expected = [ + "spa", + "par", + "arsema", + "mark", + "rkerabcdefg", + "fghi", + "hijk", + "jklmno", + "nopq", + "pqr", + "qrstuvw", + "vwx", + ] + .into_iter() + .map(|ngram| sparse_ngram_to_token(ngram.as_bytes())) + .collect::>(); + + assert_eq!(sparse_tokens.into_iter().collect::>(), expected); + } + + #[test] + fn test_ngram_rejects_unknown_params() { + let plugin = NGramIndexPlugin; + let field = Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false); + let Err(err) = plugin.new_training_request(r#"{"tokenisation":"sparse"}"#, &field) else { + panic!("expected unknown ngram params to fail"); + }; + + assert!( + err.to_string().contains("Invalid NGram index params"), + "{err}" + ); + } + async fn do_train( mut builder: NGramIndexBuilder, data: SendableRecordBatchStream, @@ -1687,6 +2133,174 @@ mod tests { ); } + #[test] + fn test_sparse_covering_tokens_are_index_tokens() { + let literal = "commonmarkerabcdefghijklmnopqrstuvwx"; + let trigram_tokens = query_tokens_for_text(NGramTokenization::Trigram, literal); + let sparse_tokens = query_tokens_for_text(NGramTokenization::Sparse, literal); + let mut index_tokens = HashSet::new(); + sparse_index_token_visitor(literal, |token| { + index_tokens.insert(token); + }); + + assert!(!sparse_tokens.is_empty()); + assert!(sparse_tokens.len() <= trigram_tokens.len()); + assert!( + sparse_tokens + .iter() + .all(|token| index_tokens.contains(token)) + ); + } + + #[test_log::test(tokio::test)] + async fn test_sparse_ngram_search_and_metadata() { + let data = StringArray::from_iter_values([ + "alpha commonmarkerabcdefghijklmnopqrstuvwx omega", + "alpha commonmarkerabcdefghijklmno omega", + "totally unrelated row", + ]); + let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64)); + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + Field::new(ROW_ID, DataType::UInt64, false), + ])); + let data = + RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap(); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(data))), + )); + + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { + tokenization: NGramTokenization::Sparse, + ..NGramIndexBuilderOptions::default() + }) + .unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + + assert_eq!(index.tokenization, NGramTokenization::Sparse); + assert_eq!( + index + .search( + &TextQuery::StringContains("commonmarkerabcdefghijklmnopqrstuvwx".to_string()), + &NoOpMetricsCollector, + ) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([0])) + ); + assert_eq!( + index + .search( + &TextQuery::Regex("commonmarkerabcdefghijklmnopqrstuvwx".to_string()), + &NoOpMetricsCollector, + ) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([0])) + ); + assert_eq!( + index + .search( + &TextQuery::Regex("commonmarker.*omega".to_string()), + &NoOpMetricsCollector, + ) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([0, 1])) + ); + } + + #[test_log::test(tokio::test)] + async fn test_ngram_loads_postings_without_tokenization_metadata() { + let tmpdir = Arc::new(TempDir::default()); + let test_store = LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + ); + let old_schema = Arc::new(Schema::new(vec![ + Field::new(TOKENS_COL, DataType::UInt32, true), + Field::new(POSTING_LIST_COL, DataType::Binary, false), + ])); + let token = ngram_to_token("cat", NGRAM_N); + let bitmap = RoaringTreemap::from_iter([42]); + let mut bitmap_bytes = Vec::new(); + bitmap.serialize_into(&mut bitmap_bytes).unwrap(); + let batch = RecordBatch::try_new( + old_schema.clone(), + vec![ + Arc::new(UInt32Array::from_iter_values([token])), + Arc::new(BinaryArray::from_iter_values([bitmap_bytes])), + ], + ) + .unwrap(); + let mut writer = test_store + .new_index_file(POSTINGS_FILENAME, old_schema) + .await + .unwrap(); + writer.write_record_batch(batch).await.unwrap(); + writer.finish().await.unwrap(); + + let index = NGramIndex::from_store(Arc::new(test_store), None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.tokenization, NGramTokenization::Trigram); + assert_eq!( + index + .search(&TextQuery::Regex("cat".to_string()), &NoOpMetricsCollector) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([42])) + ); + } + + async fn train_created_index(params: &str) -> u32 { + let data = StringArray::from_iter_values(["alpha sparsemarkerabcdefgh"]); + let row_ids = UInt64Array::from_iter_values([0]); + let schema = Arc::new(Schema::new(vec![ + Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false), + Field::new(ROW_ID, DataType::UInt64, false), + ])); + let data = + RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap(); + let data = Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(data))), + )); + + let plugin = NGramIndexPlugin; + let field = Field::new(VALUE_COLUMN_NAME, DataType::Utf8, false); + let request = plugin.new_training_request(params, &field).unwrap(); + let tmpdir = Arc::new(TempDir::default()); + let test_store = LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + ); + + plugin + .train_index( + data, + &test_store, + request, + None, + Arc::new(NoopIndexBuildProgress), + ) + .await + .unwrap() + .index_version + } + + #[test_log::test(tokio::test)] + async fn test_ngram_created_index_versions() { + assert_eq!(train_created_index("").await, NGRAM_TRIGRAM_INDEX_VERSION); + assert_eq!( + train_created_index(r#"{"tokenization":"sparse"}"#).await, + NGRAM_SPARSE_INDEX_VERSION + ); + } + fn test_data_schema() -> Arc { Arc::new(Schema::new(vec![ Field::new(VALUE_COLUMN_NAME, DataType::Utf8, true), @@ -1706,6 +2320,19 @@ mod tests { )) } + fn data_from_values(values: &[Option<&str>], first_row_id: u64) -> SendableRecordBatchStream { + let data = StringArray::from_iter(values.iter().copied()); + let row_ids = + UInt64Array::from_iter_values((0..data.len()).map(|i| first_row_id + i as u64)); + let schema = test_data_schema(); + let data = + RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap(); + Box::pin(RecordBatchStreamAdapter::new( + schema, + stream::once(std::future::ready(Ok(data))), + )) + } + #[test_log::test(tokio::test)] async fn test_ngram_nulls() { let data = simple_data_with_nulls(); @@ -1768,6 +2395,56 @@ mod tests { assert_eq!(index.tokens.len(), 3); } + #[test_log::test(tokio::test)] + async fn test_sparse_update_preserves_tokenization_and_version() { + let literal = "commonmarkerabcdefghijklmnopqrstuvwx"; + let data = data_from_values( + &[ + Some("alpha commonmarkerabcdefghijklmnopqrstuvwx omega"), + Some("totally unrelated row"), + ], + 0, + ); + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { + tokenization: NGramTokenization::Sparse, + ..NGramIndexBuilderOptions::default() + }) + .unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + + let new_tmpdir = Arc::new(TempDir::default()); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + new_tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + let new_data = data_from_values( + &[Some("updated commonmarkerabcdefghijklmnopqrstuvwx row")], + 100, + ); + + let created = index + .update(new_data, test_store.as_ref(), None) + .await + .unwrap(); + assert_eq!(created.index_version, NGRAM_SPARSE_INDEX_VERSION); + + let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.tokenization, NGramTokenization::Sparse); + assert_eq!( + index + .search( + &TextQuery::StringContains(literal.to_string()), + &NoOpMetricsCollector, + ) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([0, 100])) + ); + } + async fn row_ids_in_index(index: &NGramIndex) -> Vec { let mut row_ids = HashSet::new(); for row_offset in index.tokens.values() { @@ -1810,6 +2487,50 @@ mod tests { assert_eq!(null_posting_list, vec![100]); } + #[test_log::test(tokio::test)] + async fn test_sparse_remap_preserves_tokenization_and_version() { + let literal = "commonmarkerabcdefghijklmnopqrstuvwx"; + let data = data_from_values( + &[ + Some("alpha commonmarkerabcdefghijklmnopqrstuvwx omega"), + Some("totally unrelated row"), + ], + 0, + ); + let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { + tokenization: NGramTokenization::Sparse, + ..NGramIndexBuilderOptions::default() + }) + .unwrap(); + let (index, _tmpdir) = do_train(builder, data).await; + + let new_tmpdir = Arc::new(TempDir::default()); + let test_store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + new_tmpdir.obj_path(), + Arc::new(LanceCache::no_cache()), + )); + + let remapping = HashMap::from([(0, Some(10)), (1, None)]); + let created = index.remap(&remapping, test_store.as_ref()).await.unwrap(); + assert_eq!(created.index_version, NGRAM_SPARSE_INDEX_VERSION); + + let index = NGramIndex::from_store(test_store, None, &LanceCache::no_cache()) + .await + .unwrap(); + assert_eq!(index.tokenization, NGramTokenization::Sparse); + assert_eq!( + index + .search( + &TextQuery::Regex(literal.to_string()), + &NoOpMetricsCollector, + ) + .await + .unwrap(), + SearchResult::at_most(RowAddrTreeMap::from_iter([10])) + ); + } + #[test_log::test(tokio::test)] async fn test_ngram_index_merge() { let data = simple_data_with_nulls(); @@ -1874,6 +2595,7 @@ mod tests { let builder = NGramIndexBuilder::try_new(NGramIndexBuilderOptions { tokens_per_spill: 100, + ..NGramIndexBuilderOptions::default() }) .unwrap(); diff --git a/rust/lance-index/src/scalar/ngram/ngram_regex.rs b/rust/lance-index/src/scalar/ngram/ngram_regex.rs index ee67c479a71..d2f633b7f82 100644 --- a/rust/lance-index/src/scalar/ngram/ngram_regex.rs +++ b/rust/lance-index/src/scalar/ngram/ngram_regex.rs @@ -1,72 +1,75 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -//! Deriving a trigram pre-filter from a regular expression. +//! Deriving an ngram pre-filter from a regular expression. //! //! This is the query-side counterpart of the ngram index that lets us //! accelerate `regexp_like` / `regexp_match` predicates the same way the index //! already accelerates `contains`. The idea (the same one Postgres `pg_trgm` //! and Russ Cox's Google Code Search use) is to derive, from the regex, a -//! boolean condition over trigram presence that is *necessary* for any string +//! boolean condition over token presence that is *necessary* for any string //! to match, evaluate it against the inverted index, and let the scan recheck //! the true regex on the surviving rows. //! -//! The derived condition is a [`TrigramQuery`] -- an AND/OR tree of trigram -//! tokens. `AND` maps onto posting-list intersection and `OR` onto union, which +//! The derived condition is a [`NGramQuery`] -- an AND/OR tree of ngram tokens. +//! `AND` maps onto posting-list intersection and `OR` onto union, which //! is exactly the set algebra the ngram index is built for. //! //! # Soundness //! //! The single invariant that matters is that the condition must never require a -//! trigram that a matching string could lack -- otherwise we would drop real +//! token that a matching string could lack -- otherwise we would drop real //! matches (a false negative, far worse than a false positive, which the recheck //! removes). Everything here is therefore a conservative *over*-approximation: -//! when in doubt we emit [`TrigramQuery::All`] ("no constraint, recheck +//! when in doubt we emit [`NGramQuery::All`] ("no constraint, recheck //! everything"). Concretely: //! -//! * Every trigram requirement is produced by [`trigrams_of_string`], which runs -//! the *same* tokenizer the index was built with, so a string shorter than a -//! trigram (or with no alphanumeric run) contributes no requirement. +//! * Every token requirement is produced by [`ngrams_of_string`], which runs the +//! same tokenizer the index was built with, so a string that yields no tokens +//! contributes no requirement. //! * Character classes and case-insensitive folds are treated as a single //! unknown character (`All`), because the index's normalization does not agree //! with Unicode case folding (e.g. `(?i)c` also matches `ℂ`, which the index //! does not fold to `c`). Literal runs -- the common case -- are fully used. //! * When the exact / prefix / suffix string sets grow past a bound we first fold -//! their trigrams into the running condition and only then drop the strings, so -//! collapsing precision never removes a necessary trigram. +//! their tokens into the running condition and only then drop the strings, so +//! collapsing precision never removes a necessary token. use std::collections::{BTreeSet, HashMap, HashSet}; use regex_syntax::hir::{Class, Hir, HirKind}; use roaring::RoaringTreemap; -use super::{NGRAM_N, NGRAM_TOKENIZER, ngram_to_token, tokenize_visitor}; +use super::{ + NGRAM_N, NGRAM_TOKENIZER, NGramTokenization, ngram_to_token, query_tokens_for_text, + tokenize_visitor, +}; /// Maximum number of strings kept in an `exact` / `prefix` / `suffix` set before -/// it is folded into the trigram condition and dropped. +/// it is folded into the ngram condition and dropped. const MAX_SET_SIZE: usize = 16; /// Maximum length (in characters) of a string kept in a set. Longer strings are /// trimmed to a sound shorter affix. const MAX_STRING_LEN: usize = 32; -/// A boolean condition over trigram presence that is *necessary* for a regex to +/// A boolean condition over ngram-token presence that is *necessary* for a regex to /// match. `All` means "no constraint" and `None` means "unsatisfiable"; by /// construction these only ever appear at the root of the tree. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum TrigramQuery { +pub enum NGramQuery { /// No constraint: every row is a candidate (the scan must recheck all rows). All, /// Unsatisfiable: no row can match. None, - /// The given trigram token must be present. - Trigram(u32), + /// The given ngram token must be present. + Token(u32), /// Every child condition must hold (posting-list intersection). And(Vec), /// At least one child condition must hold (posting-list union). Or(Vec), } -impl TrigramQuery { +impl NGramQuery { /// Build an `AND` of conditions, applying identity (`All`), absorbing /// (`None`), flattening, sorting and de-duplication so the result is /// canonical and free of nested `All`/`None`. @@ -112,7 +115,7 @@ impl TrigramQuery { } /// Information about the set of strings a sub-expression can match, used to -/// build a necessary trigram condition bottom-up. For every string `s` the +/// build a necessary ngram condition bottom-up. For every string `s` the /// sub-expression matches: `s` is in `exact` (when it is `Some`), `s` starts /// with some member of `prefix` and ends with some member of `suffix`, and `s` /// satisfies `match_q`. @@ -126,8 +129,8 @@ struct RegexInfo { prefix: BTreeSet, /// Strings that every match must end with (empty = unknown). suffix: BTreeSet, - /// A necessary trigram condition for the sub-expression. - match_q: TrigramQuery, + /// A necessary ngram condition for the sub-expression. + match_q: NGramQuery, } impl RegexInfo { @@ -139,19 +142,19 @@ impl RegexInfo { exact: Some(empty.clone()), prefix: empty.clone(), suffix: empty, - match_q: TrigramQuery::All, + match_q: NGramQuery::All, } } /// A fixed literal string. - fn literal(s: &str) -> Self { + fn literal(s: &str, tokenization: NGramTokenization) -> Self { let set = BTreeSet::from([s.to_string()]); Self { emptyable: s.is_empty(), exact: Some(set.clone()), prefix: set.clone(), suffix: set, - match_q: trigrams_of_string(s), + match_q: ngrams_of_string(s, tokenization), } } @@ -162,20 +165,20 @@ impl RegexInfo { exact: None, prefix: BTreeSet::new(), suffix: BTreeSet::new(), - match_q: TrigramQuery::All, + match_q: NGramQuery::All, } } /// Enforce the size/length bounds, folding any information about to be /// discarded into `match_q` first so that precision loss never drops a /// necessary trigram. Idempotent. - fn bound(&mut self) { + fn bound(&mut self, tokenization: NGramTokenization) { let oversized_exact = self.exact.as_ref().is_some_and(|exact| { exact.len() > MAX_SET_SIZE || exact.iter().any(|s| s.chars().count() > MAX_STRING_LEN) }); if oversized_exact { let exact = self.exact.take().expect("checked above"); - self.fold_into_match(&exact); + self.fold_into_match(&exact, tokenization); } self.prefix = self @@ -185,7 +188,7 @@ impl RegexInfo { .collect(); if self.prefix.len() > MAX_SET_SIZE { let prefix = std::mem::take(&mut self.prefix); - self.fold_into_match(&prefix); + self.fold_into_match(&prefix, tokenization); } self.suffix = self @@ -195,40 +198,52 @@ impl RegexInfo { .collect(); if self.suffix.len() > MAX_SET_SIZE { let suffix = std::mem::take(&mut self.suffix); - self.fold_into_match(&suffix); + self.fold_into_match(&suffix, tokenization); } } /// AND the trigrams of `set` (a complete set of possible affixes/strings) /// into `match_q`. Sound because the set is exhaustive for its role. - fn fold_into_match(&mut self, set: &BTreeSet) { - let folded = trigrams_of_set(set.iter()); - let current = std::mem::replace(&mut self.match_q, TrigramQuery::All); - self.match_q = TrigramQuery::and(vec![current, folded]); + fn fold_into_match(&mut self, set: &BTreeSet, tokenization: NGramTokenization) { + let folded = ngrams_of_set(set.iter(), tokenization); + let current = std::mem::replace(&mut self.match_q, NGramQuery::All); + self.match_q = NGramQuery::and(vec![current, folded]); } } -/// AND together the trigrams of `s`. Reuses the index's own tokenizer so the +/// AND together the ngram tokens of `s`. Reuses the index's own tokenizer so the /// tokens are normalized (lowercase, ASCII-folded, alphanumeric-bounded) -/// exactly as they were stored. Returns `All` if `s` yields no trigram (too -/// short, or no run of three alphanumeric characters). -fn trigrams_of_string(s: &str) -> TrigramQuery { - let mut tokens = Vec::new(); - tokenize_visitor(&NGRAM_TOKENIZER, s, |ngram| { - tokens.push(TrigramQuery::Trigram(ngram_to_token(ngram, NGRAM_N))); - }); - TrigramQuery::and(tokens) +/// exactly as they were stored. Returns `All` if `s` yields no tokens. +fn ngrams_of_string(s: &str, tokenization: NGramTokenization) -> NGramQuery { + let tokens = match tokenization { + NGramTokenization::Trigram => { + let mut tokens = Vec::new(); + tokenize_visitor(&NGRAM_TOKENIZER, s, |ngram| { + tokens.push(ngram_to_token(ngram, NGRAM_N)); + }); + tokens + } + NGramTokenization::Sparse => query_tokens_for_text(tokenization, s), + }; + let tokens = tokens.into_iter().map(NGramQuery::Token).collect(); + NGramQuery::and(tokens) } -/// OR together the trigram conditions of each string in `set`. An empty set +/// OR together the ngram conditions of each string in `set`. An empty set /// means "unknown" and yields `All` (no constraint); if any member yields `All` /// the whole OR is `All`. -fn trigrams_of_set<'a>(set: impl IntoIterator) -> TrigramQuery { - let queries: Vec<_> = set.into_iter().map(|s| trigrams_of_string(s)).collect(); +fn ngrams_of_set<'a>( + set: impl IntoIterator, + tokenization: NGramTokenization, +) -> NGramQuery { + let queries: Vec<_> = set + .into_iter() + .map(|s| ngrams_of_string(s, tokenization)) + .collect(); if queries.is_empty() { - return TrigramQuery::All; + return NGramQuery::All; } - TrigramQuery::or(queries) + NGramQuery::or(queries) } /// Concatenate every string in `a` with every string in `b`. @@ -274,21 +289,21 @@ fn singleton_char(class: &Class) -> Option { } /// Compute the [`RegexInfo`] for `hir` bottom-up. -fn analyze(hir: &Hir) -> RegexInfo { +fn analyze(hir: &Hir, tokenization: NGramTokenization) -> RegexInfo { let mut info = match hir.kind() { // Zero-width: the empty match. Anchors (^, $, \b) carry no trigram. HirKind::Empty | HirKind::Look(_) => RegexInfo::empty_string(), HirKind::Literal(lit) => match std::str::from_utf8(&lit.0) { - Ok(s) => RegexInfo::literal(s), + Ok(s) => RegexInfo::literal(s, tokenization), // A literal that is not valid UTF-8 cannot be reasoned about here. Err(_) => RegexInfo::any_char(), }, HirKind::Class(class) => match singleton_char(class) { - Some(ch) => RegexInfo::literal(ch.encode_utf8(&mut [0u8; 4])), + Some(ch) => RegexInfo::literal(ch.encode_utf8(&mut [0u8; 4]), tokenization), None => RegexInfo::any_char(), }, HirKind::Repetition(rep) => { - let inner = analyze(&rep.sub); + let inner = analyze(&rep.sub, tokenization); let at_least_one = rep.min >= 1; RegexInfo { emptyable: !at_least_one || inner.emptyable, @@ -310,22 +325,22 @@ fn analyze(hir: &Hir) -> RegexInfo { match_q: if at_least_one { inner.match_q } else { - TrigramQuery::All + NGramQuery::All }, } } - HirKind::Capture(cap) => analyze(&cap.sub), - HirKind::Concat(subs) => analyze_concat(subs), - HirKind::Alternation(subs) => analyze_alternation(subs), + HirKind::Capture(cap) => analyze(&cap.sub, tokenization), + HirKind::Concat(subs) => analyze_concat(subs, tokenization), + HirKind::Alternation(subs) => analyze_alternation(subs, tokenization), }; - info.bound(); + info.bound(tokenization); info } -fn analyze_concat(subs: &[Hir]) -> RegexInfo { +fn analyze_concat(subs: &[Hir], tokenization: NGramTokenization) -> RegexInfo { let mut acc = RegexInfo::empty_string(); for sub in subs { - acc = concat_info(acc, analyze(sub)); + acc = concat_info(acc, analyze(sub, tokenization), tokenization); } acc } @@ -333,14 +348,14 @@ fn analyze_concat(subs: &[Hir]) -> RegexInfo { /// Combine two adjacent sub-expressions. This is the subtle part: it recovers /// trigrams that straddle the junction via the cross product of `acc.suffix` and /// `next.prefix`. -fn concat_info(acc: RegexInfo, next: RegexInfo) -> RegexInfo { +fn concat_info(acc: RegexInfo, next: RegexInfo, tokenization: NGramTokenization) -> RegexInfo { let emptyable = acc.emptyable && next.emptyable; // Trigrams spanning the junction (computed from the pre-merge affixes). let boundary = if acc.suffix.is_empty() || next.prefix.is_empty() { - TrigramQuery::All + NGramQuery::All } else { - trigrams_of_set(cross_concat(&acc.suffix, &next.prefix).iter()) + ngrams_of_set(cross_concat(&acc.suffix, &next.prefix).iter(), tokenization) }; // exact = acc.exact x next.exact, only while both are finite and small. @@ -366,7 +381,7 @@ fn concat_info(acc: RegexInfo, next: RegexInfo) -> RegexInfo { None => next.suffix.clone(), }; - let match_q = TrigramQuery::and(vec![acc.match_q, next.match_q, boundary]); + let match_q = NGramQuery::and(vec![acc.match_q, next.match_q, boundary]); let mut info = RegexInfo { emptyable, @@ -375,12 +390,12 @@ fn concat_info(acc: RegexInfo, next: RegexInfo) -> RegexInfo { suffix, match_q, }; - info.bound(); + info.bound(tokenization); info } -fn analyze_alternation(subs: &[Hir]) -> RegexInfo { - let infos: Vec = subs.iter().map(analyze).collect(); +fn analyze_alternation(subs: &[Hir], tokenization: NGramTokenization) -> RegexInfo { + let infos: Vec = subs.iter().map(|sub| analyze(sub, tokenization)).collect(); let emptyable = infos.iter().any(|i| i.emptyable); @@ -413,7 +428,7 @@ fn analyze_alternation(subs: &[Hir]) -> RegexInfo { BTreeSet::new() }; - let match_q = TrigramQuery::or(infos.into_iter().map(|i| i.match_q).collect()); + let match_q = NGramQuery::or(infos.into_iter().map(|i| i.match_q).collect()); RegexInfo { emptyable, @@ -424,87 +439,87 @@ fn analyze_alternation(subs: &[Hir]) -> RegexInfo { } } -/// Derive a necessary trigram condition from a regular expression pattern. +/// Derive a necessary ngram condition from a regular expression pattern. /// -/// Returns [`TrigramQuery::All`] when no useful condition can be derived (an -/// unparsable pattern, or one with no trigram-able literal structure such as +/// Returns [`NGramQuery::All`] when no useful condition can be derived (an +/// unparsable pattern, or one with no usable literal structure such as /// `a.b` or `.*`); callers must treat that as "recheck everything". -pub fn regex_to_trigram_query(pattern: &str) -> TrigramQuery { +pub fn regex_to_ngram_query(pattern: &str, tokenization: NGramTokenization) -> NGramQuery { // An unparsable pattern cannot be accelerated; rechecking is still safe. let Ok(hir) = regex_syntax::parse(pattern) else { - return TrigramQuery::All; + return NGramQuery::All; }; - let info = analyze(&hir); + let info = analyze(&hir, tokenization); let mut conditions = vec![info.match_q]; if let Some(exact) = &info.exact { if exact.is_empty() { // The expression matches nothing. - return TrigramQuery::None; + return NGramQuery::None; } - conditions.push(trigrams_of_set(exact.iter())); + conditions.push(ngrams_of_set(exact.iter(), tokenization)); } - conditions.push(trigrams_of_set(info.prefix.iter())); - conditions.push(trigrams_of_set(info.suffix.iter())); - TrigramQuery::and(conditions) + conditions.push(ngrams_of_set(info.prefix.iter(), tokenization)); + conditions.push(ngrams_of_set(info.suffix.iter(), tokenization)); + NGramQuery::and(conditions) } -/// Whether a regular expression yields any trigram condition the index can use +/// Whether a regular expression yields any ngram condition the index can use /// to prune candidates. When it does not (e.g. `a.b`, `.*`, or a case-insensitive /// pattern), callers should leave the predicate to a full scan rather than route /// it to the index, which would otherwise have to ask the scan to recheck every /// row -- a path the index result type (`AtLeast`) does not support. pub fn regex_can_use_index(pattern: &str) -> bool { - regex_to_trigram_query(pattern) != TrigramQuery::All + regex_to_ngram_query(pattern, NGramTokenization::Trigram) != NGramQuery::All } -/// Collect the distinct trigram tokens referenced anywhere in the tree. -pub fn collect_tokens(query: &TrigramQuery, out: &mut HashSet) { +/// Collect the distinct ngram tokens referenced anywhere in the tree. +pub fn collect_tokens(query: &NGramQuery, out: &mut HashSet) { match query { - TrigramQuery::Trigram(token) => { + NGramQuery::Token(token) => { out.insert(*token); } - TrigramQuery::And(items) | TrigramQuery::Or(items) => { + NGramQuery::And(items) | NGramQuery::Or(items) => { for item in items { collect_tokens(item, out); } } - TrigramQuery::All | TrigramQuery::None => {} + NGramQuery::All | NGramQuery::None => {} } } -/// Evaluate the tree against a map of `trigram token -> posting list`. A token -/// missing from the map contributes an empty set (sound: a required trigram that +/// Evaluate the tree against a map of `ngram token -> posting list`. A token +/// missing from the map contributes an empty set (sound: a required token that /// is absent everywhere yields no rows; an absent OR branch contributes /// nothing). `All` / `None` are handled by the caller before evaluation. -pub fn eval_trigram_query( - query: &TrigramQuery, +pub fn eval_ngram_query( + query: &NGramQuery, bitmaps: &HashMap, ) -> RoaringTreemap { match query { - TrigramQuery::Trigram(token) => bitmaps.get(token).cloned().unwrap_or_default(), - TrigramQuery::And(items) => { + NGramQuery::Token(token) => bitmaps.get(token).cloned().unwrap_or_default(), + NGramQuery::And(items) => { let mut iter = items.iter(); let mut acc = match iter.next() { - Some(first) => eval_trigram_query(first, bitmaps), + Some(first) => eval_ngram_query(first, bitmaps), None => return RoaringTreemap::new(), }; for item in iter { if acc.is_empty() { break; } - acc &= &eval_trigram_query(item, bitmaps); + acc &= &eval_ngram_query(item, bitmaps); } acc } - TrigramQuery::Or(items) => { + NGramQuery::Or(items) => { let mut acc = RoaringTreemap::new(); for item in items { - acc |= &eval_trigram_query(item, bitmaps); + acc |= &eval_ngram_query(item, bitmaps); } acc } - TrigramQuery::All | TrigramQuery::None => RoaringTreemap::new(), + NGramQuery::All | NGramQuery::None => RoaringTreemap::new(), } } @@ -512,13 +527,13 @@ pub fn eval_trigram_query( mod tests { use super::*; - /// A single trigram condition, hashed the same way the index hashes it. - fn tri(trigram: &str) -> TrigramQuery { - TrigramQuery::Trigram(ngram_to_token(trigram, NGRAM_N)) + /// A single fixed-trigram token, hashed the same way the index hashes it. + fn tri(trigram: &str) -> NGramQuery { + NGramQuery::Token(ngram_to_token(trigram, NGRAM_N)) } - fn q(pattern: &str) -> TrigramQuery { - regex_to_trigram_query(pattern) + fn q(pattern: &str) -> NGramQuery { + regex_to_ngram_query(pattern, NGramTokenization::Trigram) } #[test] @@ -530,34 +545,28 @@ mod tests { fn test_multi_trigram_literal() { assert_eq!( q("foobar"), - TrigramQuery::and(vec![tri("foo"), tri("oob"), tri("oba"), tri("bar")]) + NGramQuery::and(vec![tri("foo"), tri("oob"), tri("oba"), tri("bar")]) ); } #[test] fn test_wildcard_splits_into_and() { // `.*` breaks the literal run; both sides are required. - assert_eq!( - q("foo.*bar"), - TrigramQuery::and(vec![tri("foo"), tri("bar")]) - ); + assert_eq!(q("foo.*bar"), NGramQuery::and(vec![tri("foo"), tri("bar")])); } #[test] fn test_alternation_is_or() { - assert_eq!( - q("(cat|dog)"), - TrigramQuery::or(vec![tri("cat"), tri("dog")]) - ); + assert_eq!(q("(cat|dog)"), NGramQuery::or(vec![tri("cat"), tri("dog")])); } #[test] fn test_anchors_are_transparent() { assert_eq!( q("^rhino"), - TrigramQuery::and(vec![tri("rhi"), tri("hin"), tri("ino")]) + NGramQuery::and(vec![tri("rhi"), tri("hin"), tri("ino")]) ); - assert_eq!(q("nose$"), TrigramQuery::and(vec![tri("nos"), tri("ose")])); + assert_eq!(q("nose$"), NGramQuery::and(vec![tri("nos"), tri("ose")])); } #[test] @@ -567,18 +576,18 @@ mod tests { // trigram straddling the `(o)` group boundary in "foobar". assert_eq!( q("fo(o)bar"), // spellchecker:disable-line - TrigramQuery::and(vec![tri("foo"), tri("oob"), tri("oba"), tri("bar")]) + NGramQuery::and(vec![tri("foo"), tri("oob"), tri("oba"), tri("bar")]) ); } #[test] fn test_no_trigram_yields_all() { // No run of three literal characters anywhere. - assert_eq!(q("a.b"), TrigramQuery::All); - assert_eq!(q(".*"), TrigramQuery::All); + assert_eq!(q("a.b"), NGramQuery::All); + assert_eq!(q(".*"), NGramQuery::All); // Every alternation branch is shorter than a trigram, so we must not // require either two-character branch as a (non-existent) trigram. - assert_eq!(q("fo|ba"), TrigramQuery::All); // spellchecker:disable-line + assert_eq!(q("fo|ba"), NGramQuery::All); // spellchecker:disable-line } #[test] @@ -586,12 +595,12 @@ mod tests { // Unicode case folding (e.g. `(?i)c` also matches U+2102) does not agree // with the index's normalization, so case-insensitive patterns are left // unaccelerated (correct via recheck) rather than risk a false negative. - assert_eq!(q("(?i)Cat"), TrigramQuery::All); + assert_eq!(q("(?i)Cat"), NGramQuery::All); } #[test] fn test_unparsable_pattern_yields_all() { - assert_eq!(q("("), TrigramQuery::All); + assert_eq!(q("("), NGramQuery::All); } #[test] @@ -605,7 +614,7 @@ mod tests { let result = q(&pattern); // Each branch shares the trigram `aa0`/`aa1`/... and `zz`-ish endings; // the important property is that it is a sound non-empty condition. - assert_ne!(result, TrigramQuery::None); + assert_ne!(result, NGramQuery::None); } #[test] @@ -630,34 +639,34 @@ mod tests { // `baz` is absent from the index. // AND intersects. - let and = TrigramQuery::and(vec![tri("foo"), tri("bar")]); + let and = NGramQuery::and(vec![tri("foo"), tri("bar")]); assert_eq!( - eval_trigram_query(&and, &bitmaps), + eval_ngram_query(&and, &bitmaps), RoaringTreemap::from_iter([2u64, 3]) ); // OR unions. - let or = TrigramQuery::or(vec![tri("foo"), tri("bar")]); + let or = NGramQuery::or(vec![tri("foo"), tri("bar")]); assert_eq!( - eval_trigram_query(&or, &bitmaps), + eval_ngram_query(&or, &bitmaps), RoaringTreemap::from_iter([1u64, 2, 3, 4]) ); // A missing token is empty: it zeroes an AND but is harmless in an OR. - let and_missing = TrigramQuery::and(vec![tri("foo"), tri("baz")]); - assert!(eval_trigram_query(&and_missing, &bitmaps).is_empty()); - let or_missing = TrigramQuery::or(vec![tri("foo"), tri("baz")]); + let and_missing = NGramQuery::and(vec![tri("foo"), tri("baz")]); + assert!(eval_ngram_query(&and_missing, &bitmaps).is_empty()); + let or_missing = NGramQuery::or(vec![tri("foo"), tri("baz")]); assert_eq!( - eval_trigram_query(&or_missing, &bitmaps), + eval_ngram_query(&or_missing, &bitmaps), RoaringTreemap::from_iter([1u64, 2, 3]) ); } #[test] fn test_collect_tokens() { - let query = TrigramQuery::and(vec![ + let query = NGramQuery::and(vec![ tri("foo"), - TrigramQuery::or(vec![tri("bar"), tri("baz")]), + NGramQuery::or(vec![tri("bar"), tri("baz")]), ]); let mut tokens = HashSet::new(); collect_tokens(&query, &mut tokens); diff --git a/rust/lance/benches/regex_ngram.rs b/rust/lance/benches/regex_ngram.rs index 76f597ad9cb..6463e8dac0f 100644 --- a/rust/lance/benches/regex_ngram.rs +++ b/rust/lance/benches/regex_ngram.rs @@ -4,19 +4,12 @@ //! Benchmark: regex predicate scans over an ngram-indexed string column. //! //! Each query is a `regexp_match(doc, '...')` filter against a dataset that has -//! an NGram index on `doc`. The query set spans a selective AND pattern, an -//! alternation, a plain literal (rewritten to an infix LIKE before it reaches -//! the index), and a deliberately non-accelerable pattern (`a.b`, which yields -//! no trigram) that serves as a regression guard. -//! -//! On `main` none of these use the index (regex falls through to a full scan + -//! recheck); with the ngram-regex acceleration the index prunes candidates for -//! the first three while `a.b` stays a full scan. Capture a baseline on `main` -//! with `--save-baseline before_7130`, then compare after the change with -//! `--baseline before_7130`. +//! an NGram index on `doc`. The benchmark builds both the default fixed-trigram +//! index and the experimental sparse n-gram index so the same workload can +//! compare query-time pruning cost directly. use std::hint::black_box; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::Duration; use arrow::array::AsArray; @@ -29,16 +22,27 @@ use lance::index::DatasetIndexExt; use lance_core::utils::tempfile::TempStrDir; use lance_datagen::{RowCount, array}; use lance_index::IndexType; -use lance_index::scalar::ScalarIndexParams; +use lance_index::scalar::{BuiltinIndexType, ScalarIndexParams}; #[cfg(target_os = "linux")] use lance_testing::pprof::{Output, PProfProfiler}; const TOTAL: usize = 200_000; +const SPARSE_DECOY_LITERAL: &str = "sparsemarkerabcdefghijklmnopqrstuvwx"; + +static SPARSE_DECOY_TRIGRAMS: LazyLock = LazyLock::new(|| { + let mut trigrams = String::new(); + for trigram in SPARSE_DECOY_LITERAL.as_bytes().windows(3) { + if !trigrams.is_empty() { + trigrams.push(' '); + } + trigrams.push_str(std::str::from_utf8(trigram).unwrap()); + } + trigrams +}); /// Build the `doc` column: random sentences with rare markers injected into a /// small fraction of rows so the regex queries have controlled selectivity. -/// The markers (`zqxwvu`, `needlexyz`, `qwerasdf`) are unlikely to appear in -/// the generated English-word sentences. +/// The markers are unlikely to appear in the generated English-word sentences. fn build_docs() -> StringArray { let mut sentence_gen = array::random_sentence(1, 30, false); let base = sentence_gen @@ -47,7 +51,7 @@ fn build_docs() -> StringArray { let base = base.as_string::(); let docs = (0..TOTAL).map(|i| { let sentence = base.value(i); - if i % 200 == 0 { + let mut doc = if i % 200 == 0 { // ~0.5% of rows match `zqxwvu.*needlexyz` and `zqxwvu`. format!("{sentence} zqxwvu needlexyz") } else if i % 211 == 0 { @@ -55,12 +59,48 @@ fn build_docs() -> StringArray { format!("{sentence} qwerasdf") } else { sentence.to_string() + }; + // Every row contains all trigrams from SPARSE_DECOY_LITERAL, but only a + // small fraction contains the full literal. This creates a workload where + // fixed trigrams produce a broad candidate set while sparse longer n-grams + // can stay selective. + doc.push(' '); + doc.push_str(&SPARSE_DECOY_TRIGRAMS); + if i % 997 == 0 { + doc.push(' '); + doc.push_str(SPARSE_DECOY_LITERAL); } + doc }); StringArray::from_iter_values(docs) } -async fn build_dataset(tempdir: &TempStrDir) -> Arc { +#[derive(Clone, Copy)] +enum BenchTokenization { + Trigram, + Sparse, +} + +impl BenchTokenization { + fn name(self) -> &'static str { + match self { + Self::Trigram => "trigram", + Self::Sparse => "sparse", + } + } + + fn params(self) -> ScalarIndexParams { + let params = ScalarIndexParams::for_builtin(BuiltinIndexType::NGram); + match self { + Self::Trigram => params, + Self::Sparse => params.with_params(&serde_json::json!({ + "tokenization": "sparse", + })), + } + } +} + +async fn build_dataset(tempdir: &TempStrDir, tokenization: BenchTokenization) -> Arc { let schema = Arc::new(Schema::new(vec![Field::new("doc", DataType::Utf8, false)])); let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(build_docs())]).unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); @@ -73,7 +113,7 @@ async fn build_dataset(tempdir: &TempStrDir) -> Arc { &["doc"], IndexType::NGram, None, - &ScalarIndexParams::default(), + &tokenization.params(), true, ) .await @@ -91,8 +131,10 @@ async fn scan_filter(dataset: &Dataset, filter: &str) -> usize { fn bench_regex_ngram(c: &mut Criterion) { let rt = tokio::runtime::Runtime::new().unwrap(); - let tempdir = TempStrDir::default(); - let dataset = rt.block_on(build_dataset(&tempdir)); + let trigram_tempdir = TempStrDir::default(); + let sparse_tempdir = TempStrDir::default(); + let trigram_dataset = rt.block_on(build_dataset(&trigram_tempdir, BenchTokenization::Trigram)); + let sparse_dataset = rt.block_on(build_dataset(&sparse_tempdir, BenchTokenization::Sparse)); let queries = [ ("selective_and", "regexp_match(doc, 'zqxwvu.*needlexyz')"), @@ -101,6 +143,10 @@ fn bench_regex_ngram(c: &mut Criterion) { "regexp_match(doc, '(zqxwvu|qwerasdf|needlexyz)')", ), ("plain_literal", "regexp_match(doc, 'zqxwvu')"), + ( + "sparse_decoy_literal", + "regexp_match(doc, 'sparsemarkerabcdefghijklmnopqrstuvwx')", + ), ("non_accelerable_a_dot_b", "regexp_match(doc, 'a.b')"), ]; @@ -108,10 +154,15 @@ fn bench_regex_ngram(c: &mut Criterion) { group .sample_size(10) .measurement_time(Duration::from_secs(15)); - for (name, filter) in queries { - group.bench_function(name, |b| { - b.iter(|| black_box(rt.block_on(scan_filter(&dataset, filter)))); - }); + for (tokenization, dataset) in [ + (BenchTokenization::Trigram, trigram_dataset.as_ref()), + (BenchTokenization::Sparse, sparse_dataset.as_ref()), + ] { + for (name, filter) in queries { + group.bench_function(format!("{}/{name}", tokenization.name()), |b| { + b.iter(|| black_box(rt.block_on(scan_filter(dataset, filter)))); + }); + } } group.finish(); }