Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/src/format/index/scalar/fts.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ An FTS index may contain multiple partitions. Each partition has its own set of
| `_length` | UInt32 | false | Number of documents containing the token |
| `_compressed_position` | List<List<LargeBinary>> | true | Optional compressed position lists for phrase queries |

The posting-list file schema metadata includes `posting_block_size`, the number of documents encoded per compressed posting block. Older indexes that do not have this metadata use the legacy block size `128`.

### Metadata File Schema

The metadata file contains JSON-serialized configuration and partition information:
Expand All @@ -67,6 +69,7 @@ The metadata file contains JSON-serialized configuration and partition informati
| `min_gram` | UInt32 | 2 | Minimum n-gram length (only for ngram tokenizer) |
| `max_gram` | UInt32 | 15 | Maximum n-gram length (only for ngram tokenizer) |
| `prefix_only` | Boolean | false | Generate only prefix n-grams (only for ngram tokenizer) |
| `block_size` | UInt32 | 256 | Documents per compressed posting block. Must be 128 or 256. Missing values from older indexes read as 128. |

## Tokenizers

Expand Down
1 change: 1 addition & 0 deletions docs/src/quickstart/full-text-search.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ds.create_scalar_index(
remove_stop_words=True, # Remove stop words (language-dependent)
custom_stop_words=None, # Optional additional stop words (only used if remove_stop_words=True)
ascii_folding=True, # Fold accents to ASCII when possible (e.g., "é" -> "e")
block_size=256, # Posting block size: 128 or 256
)
```

Expand Down
22 changes: 22 additions & 0 deletions java/src/main/java/org/lance/index/scalar/InvertedIndexParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static final class Builder {
private Integer minNgramLength;
private Integer maxNgramLength;
private Boolean prefixOnly;
private Integer blockSize = 256;
private Boolean skipMerge;

/**
Expand Down Expand Up @@ -225,6 +226,24 @@ public Builder prefixOnly(boolean prefixOnly) {
return this;
}

/**
* Configure the number of documents in each compressed posting block.
*
* <p>Supported values are {@code 128} and {@code 256}. New indexes default to {@code 256} when
* this is not set.
*
* @param blockSize posting block size
* @return this builder
* @throws IllegalArgumentException if {@code blockSize} is unsupported
*/
public Builder blockSize(int blockSize) {
if (blockSize != 128 && blockSize != 256) {
throw new IllegalArgumentException("blockSize must be one of 128 or 256");
}
this.blockSize = blockSize;
return this;
}

/**
* Configure whether to skip the partition merge stage after indexing. If true, skip the
* partition merge stage after indexing. This can be useful for distributed indexing where merge
Expand Down Expand Up @@ -282,6 +301,9 @@ public ScalarIndexParams build() {
if (prefixOnly != null) {
params.put("prefix_only", prefixOnly);
}
if (blockSize != null) {
params.put("block_size", blockSize);
}
if (skipMerge != null) {
params.put("skip_merge", skipMerge);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,49 @@
*/
package org.lance.index.scalar;

import org.lance.util.JsonUtils;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class InvertedIndexParamsTest {
class InvertedIndexParamsTest {

@Test
public void testIcuSplitTokenizerVariant() {
void testIcuSplitTokenizerVariant() {
ScalarIndexParams params = InvertedIndexParams.builder().baseTokenizer("icu/split").build();

assertEquals("inverted", params.getIndexType());
String jsonParams = params.getJsonParams().orElseThrow(AssertionError::new);
assertTrue(jsonParams.contains("\"base_tokenizer\":\"icu/split\""));
}

@Test
void defaultBlockSizeIsSerialized() {
ScalarIndexParams params = InvertedIndexParams.builder().build();

Map<String, Object> json = JsonUtils.fromJson(params.getJsonParams().orElseThrow());
assertEquals(256, ((Number) json.get("block_size")).intValue());
}

@Test
void blockSizeIsSerialized() {
ScalarIndexParams params = InvertedIndexParams.builder().blockSize(128).build();

assertEquals("inverted", params.getIndexType());
Map<String, Object> json = JsonUtils.fromJson(params.getJsonParams().orElseThrow());
assertEquals(128, ((Number) json.get("block_size")).intValue());
}

@Test
void invalidBlockSizeIsRejected() {
assertThrows(
IllegalArgumentException.class, () -> InvertedIndexParams.builder().blockSize(129));
assertThrows(
IllegalArgumentException.class, () -> InvertedIndexParams.builder().blockSize(512));
}
}
1 change: 1 addition & 0 deletions protos/index_old.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ message InvertedIndexDetails {
uint32 min_ngram_length = 9;
uint32 max_ngram_length = 10;
bool prefix_only = 11;
optional uint32 block_size = 12;
}
3 changes: 3 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3315,6 +3315,9 @@ def create_scalar_index(
query. This will significantly increase the index size.
It won't impact the performance of non-phrase queries even if it is set to
True.
block_size: int, default 256
This is for the ``INVERTED`` index. Number of documents per compressed
posting block. Must be one of ``128`` or ``256``.
memory_limit: int, optional
This is for the ``INVERTED`` index. Total build-time memory limit in MiB.
If set, Lance divides this budget evenly across the workers. If unset,
Expand Down
13 changes: 11 additions & 2 deletions python/python/tests/compat/test_scalar_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
and written by other versions.
"""

import os
import shutil
from pathlib import Path

Expand Down Expand Up @@ -320,7 +321,12 @@ def create(self):
max_rows_per_file=100,
data_storage_version=safe_data_storage_version(self.compat_version),
)
dataset.create_scalar_index("text", "INVERTED", with_position=True)
kwargs = {"with_position": True}
# Downgrade reads use older wheels, so current-created FTS indexes must
# stay on the legacy posting block layout.
if os.environ.get("LANCE_COMPAT_FTS_LEGACY_BLOCK_SIZE") == "1":
kwargs["block_size"] = 128
dataset.create_scalar_index("text", "INVERTED", **kwargs)

def check_read(self):
"""Verify FTS index can be queried."""
Expand Down Expand Up @@ -351,7 +357,10 @@ def skip_downgrade(self, version: str) -> bool:

def current_env(self, method_name: str) -> dict[str, str]:
if method_name == "create":
return {"LANCE_FTS_FORMAT_VERSION": "1"}
return {
"LANCE_COMPAT_FTS_LEGACY_BLOCK_SIZE": "1",
"LANCE_FTS_FORMAT_VERSION": "1",
}
if method_name == "check_write":
return {"LANCE_FTS_FORMAT_VERSION": "2"}
return {}
20 changes: 20 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,26 @@ def test_create_scalar_index_fts_alias(dataset):
assert any(idx.index_type == "Inverted" for idx in dataset.describe_indices())


def test_create_scalar_index_fts_block_size(dataset):
dataset.create_scalar_index(
"doc", index_type="INVERTED", with_position=False, block_size=256
)
row = dataset.take(indices=[0], columns=["doc"])
query = row.column(0)[0].as_py().split(" ")[0]
results = dataset.scanner(columns=["doc"], full_text_query=query).to_table()
assert results.num_rows > 0

with pytest.raises(ValueError, match="block_size"):
dataset.create_scalar_index(
"doc", index_type="INVERTED", name="doc_invalid_129", block_size=129
)

with pytest.raises(ValueError, match="block_size"):
dataset.create_scalar_index(
"doc", index_type="INVERTED", name="doc_invalid_512", block_size=512
)


def test_multi_index_create(tmp_path):
dataset = lance.write_dataset(
pa.table({"ints": range(1024)}), tmp_path, max_rows_per_file=100
Expand Down
5 changes: 5 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2429,6 +2429,11 @@ impl Dataset {
if let Some(prefix_only) = kwargs.get_item("prefix_only")? {
params = params.ngram_prefix_only(prefix_only.extract()?);
}
if let Some(block_size) = kwargs.get_item("block_size")? {
params = params
.block_size(block_size.extract()?)
.map_err(|e| PyValueError::new_err(e.to_string()))?;
}
if let Some(memory_limit) = kwargs.get_item("memory_limit")? {
params = params.memory_limit_mb(memory_limit.extract()?);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,11 @@ enum InstructionSet {
Scalar,
}

/// Internal 8-wide bitpacker implementation.
/// 8-wide bitpacker implementation.
///
/// One block contains 256 integers. This stays private to avoid exposing a new
/// block-size choice through the public Lance bitpacking API.
/// One block contains 256 integers.
#[derive(Clone, Copy)]
pub(crate) struct BitPacker8x(InstructionSet);
pub struct BitPacker8x(InstructionSet);

impl BitPacker8x {
#[cfg(target_arch = "x86_64")]
Expand Down
1 change: 1 addition & 0 deletions rust/compression/bitpacking/src/bitpacker_internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod bitpacker4x;
mod bitpacker8x;

pub use bitpacker4x::BitPacker4x;
pub use bitpacker8x::BitPacker8x;

pub(crate) trait Available {
fn available() -> bool;
Expand Down
2 changes: 1 addition & 1 deletion rust/compression/bitpacking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use core::mem::size_of;

mod bitpacker_internal;

pub use bitpacker_internal::{BitPacker, BitPacker4x};
pub use bitpacker_internal::{BitPacker, BitPacker4x, BitPacker8x};

pub const FL_ORDER: [usize; 8] = [0, 4, 2, 6, 1, 5, 3, 7];

Expand Down
3 changes: 3 additions & 0 deletions rust/lance-index/protos-cache/cache.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ message CompressedPostingHeader {
PositionStorage position_storage = 4;
// Only meaningful when position_storage == POSITION_STORAGE_SHARED.
PositionStreamCodec position_stream_codec = 5;
// Number of documents in each compressed posting block. Older cache entries
// omit this field and decode as the legacy 128-doc block size.
uint32 block_size = 6;
}

// Header for a serialized `PlainPostingList` cache entry. Followed by an Arrow
Expand Down
35 changes: 34 additions & 1 deletion rust/lance-index/src/scalar/inverted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl ScalarIndexPlugin for InvertedIndexPlugin {
.into()))
}

let params = serde_json::from_str::<InvertedIndexParams>(params)?;
let params = InvertedIndexParams::from_training_json(params)?;
Ok(Box::new(InvertedIndexTrainingRequest::new(params)))
}

Expand Down Expand Up @@ -308,6 +308,7 @@ impl ScalarIndexPlugin for InvertedIndexPlugin {
#[cfg(test)]
mod tests {
use super::*;
use crate::scalar::{BuiltinIndexType, ScalarIndexParams};

#[test]
fn test_plugin_version_tracks_max_supported_format() {
Expand All @@ -317,4 +318,36 @@ mod tests {
max_supported_fts_format_version().index_version()
);
}

#[test]
fn test_new_training_request_defaults_missing_block_size_to_256() {
let plugin = InvertedIndexPlugin;
let field = Field::new("text", DataType::Utf8, true);

let cases = [
(
ScalarIndexParams::for_builtin(BuiltinIndexType::Inverted),
false,
),
(ScalarIndexParams::new("inverted".to_string()), false),
(
ScalarIndexParams::new("inverted".to_string())
.with_params(&serde_json::json!({ "with_position": true })),
true,
),
];

for (params, expected_with_position) in cases {
let request = plugin
.new_training_request(params.params.as_deref().unwrap_or("{}"), &field)
.unwrap();
let request = request
.as_any()
.downcast_ref::<InvertedIndexTrainingRequest>()
.unwrap();

assert_eq!(request.parameters.posting_block_size(), DEFAULT_BLOCK_SIZE);
assert_eq!(request.parameters.has_positions(), expected_with_position);
}
}
}
Loading
Loading