diff --git a/docs/src/format/table/.pages b/docs/src/format/table/.pages index 16c20058608..5b0cb0e95e6 100644 --- a/docs/src/format/table/.pages +++ b/docs/src/format/table/.pages @@ -6,4 +6,5 @@ nav: - Layout: layout.md - Branch & Tag: branch_tag.md - Row ID & Lineage: row_id_lineage.md + - Data Overlay Files: data_overlay_file.md - MemTable & WAL: mem_wal.md diff --git a/docs/src/format/table/data_overlay_file.md b/docs/src/format/table/data_overlay_file.md new file mode 100644 index 00000000000..5540f860018 --- /dev/null +++ b/docs/src/format/table/data_overlay_file.md @@ -0,0 +1,390 @@ +# Data Overlay Files + +!!! note "Overlay files require feature flag 64 (data overlay files)" + + A reader or writer that does not understand overlay files must refuse a + dataset that uses them. Silently ignoring an overlay would return stale base + values, which is a correctness bug rather than a degraded experience. + +Overlay files supply new values for a subset of `(row offset, field)` cells +within a fragment **without rewriting the fragment's base data files**. They make +updates cheap when only a small fraction of rows and/or columns change: instead +of rewriting whole columns or moving rows to a new fragment, a writer appends a +small file carrying just the changed cells. + +This is Lance's third mechanism for changing data in place, alongside +[deletion files](index.md#deletion-files) (which remove rows) and +[data evolution](index.md#data-evolution) (which adds or rewrites whole columns). +An overlay changes individual cells. + +## Concepts + +### Coverage and resolution + +Each overlay declares which cells it provides through a **coverage** bitmap (or, +for sparse overlays, one bitmap per field). The bitmaps index **physical row +offsets** — positions in the base data files, counting deleted rows — so they are +stable across deletions, exactly like deletion vectors. + +To resolve a cell `(offset, field)` on read, walk the fragment's overlays from +**newest to oldest**. The first overlay that covers `(offset, field)` wins; its +value is used. If no overlay covers the cell, the value falls through to the base +data file (or is `NULL` if no base data file holds that field). + +Precedence among overlays is determined by: + +1. `committed_version` — higher wins (see [Versioning](#versioning-and-ordering)). +2. Position in `DataFragment.overlays` as a tiebreaker — a later entry is newer. + +A covered offset whose value is `NULL` overrides the cell **to** `NULL`. This is +distinct from an offset that is simply absent from the bitmap, which falls +through to the base. Coverage, not value-nullness, decides whether an overlay +applies. + +### Interaction with deletions + +Deletions take precedence over overlays. If a row offset is marked deleted in the +fragment's deletion file, any overlay value for that offset is dead and is +ignored, regardless of commit order. This keeps the invariant simple: a deletion +is the final word on a row, so a concurrent overlay against a row that was +deleted needs no special conflict handling — its values are merely inert. + +### Physical layout + +An overlay's data file stores **one value column per field**, in the order of +`data_file.fields`. It does **not** store a row-offset key column. The position of +a covered offset's value within its column is the **rank** of that offset in the +field's coverage bitmap — the number of set bits below it. For a Roaring bitmap +this is an O(1) operation, so random access to any cell is a rank computation +followed by a single value fetch, with no offset column to read and no binary +search. + +Because different fields may cover different offset sets, the value columns of a +single sparse overlay may have **different lengths**. The Lance file format +permits columns of differing item counts within one file, so a sparse overlay is +representable as a single file. (See [Writer support](#writer-support) for the +current implementation status.) + +### Dense vs. sparse overlays + +A single overlay is one of two shapes: + +- **Dense (rectangular).** One `shared_offset_bitmap` applies to every field. Every + covered offset has a value for every field. This is the common case for a plain + `UPDATE`, where one `SET` list is applied to one set of rows. +- **Sparse.** A `FieldCoverage` carries one bitmap per field, used when different + fields cover different offset sets — for example a `MERGE` with multiple + `WHEN MATCHED` branches, where different rows update different columns. A dense + overlay would have to widen to the bounding rectangle and fill the untouched + cells with their current values (post-images), which for wide columns such as + embeddings means re-storing data that did not change. A sparse overlay stores + exactly the changed cells. + +A writer may always express a non-rectangular update as **multiple dense overlays +in one transaction** (one per coverage group) instead of a single sparse overlay. + +## Protobuf + +
+DataOverlayFile protobuf message + +```protobuf +%%% proto.message.DataOverlayFile %%% +``` + +
+ +
+FieldCoverage protobuf message + +```protobuf +%%% proto.message.FieldCoverage %%% +``` + +
+ +## Versioning and ordering + +Overlays reuse the dataset version as their ordering clock rather than +introducing a separate generation counter. + +`committed_version` is the dataset version at which an overlay **became +effective** — the version of the commit that introduced it, **not** the version +it was read from. It is stamped at commit time and re-stamped if the commit is +retried, in the same way as the created-at / last-updated-at version sequences. + +This single value drives every ordering decision: + +- **Overlay vs. overlay** (read precedence): higher `committed_version` wins. +- **Overlay vs. index** (query correctness): an index records the + `dataset_version` it was built from. An index whose `dataset_version >= + committed_version` already incorporates the overlay. An overlay whose + `committed_version > index.dataset_version` is newer than the index and its + cells must be excluded from index results and re-evaluated. +- **Scheduler signal**: the gap between an overlay's `committed_version` and an + index's `dataset_version`, or between an overlay and the base, is a staleness + measure the compaction scheduler can use. + +!!! note "Why effective version, not read version" + + Suppose an overlay reads version 5 and commits at version 6, while an index + is built reading version 5 (before the overlay) and commits at version 7 with + `dataset_version = 5`. If the overlay stored its *read* version (5), the test + `5 > 5` is false, the row would not be excluded, and the index — which never + saw the overlay — would return a stale result. Storing the *effective* + version (6) makes `6 > 5` true, the cell is excluded and re-evaluated, and the + result is correct. + +## Index integration + +Building an index over a fragment that has overlays does **not** require dropping +the fragment from the index's coverage. The fragment stays indexed, and the query +path reconciles overlays at query time using an **exclusion set**. + +The exclusion set for an index on field `F` is the union of the coverage bitmaps, +restricted to field `F`, of every overlay whose `committed_version > +index.dataset_version`. The exclusion is **field-aware**: an overlay that touches +only unrelated columns does not exclude anything from the index on `F`. + +The query then proceeds as: + +1. Run the index search as usual, producing candidate rows. +2. Remove any candidate in the exclusion set. (Its indexed value may be stale.) +3. **Re-evaluate** the excluded rows against their current values — the same flat + path already used for the unindexed tail of fragments. For a scalar predicate + this re-applies the filter; for a vector query it re-scores the row's current + vector. Rows that still match are added back to the result. + +Step 3 is what makes exclusion correct rather than merely safe: removing a row +from index candidates without re-evaluating it would silently drop a row that +should match under its new value. + +### Correctness invariant + +> For every indexed field `F` and every row offset `o` in a fragment the index +> covers, the index's entry for `(o, F)` is trusted unless `o` is excluded. +> `o` is excluded iff some overlay with `committed_version > index.dataset_version` +> covers `(o, F)`. + +The write and compaction paths together preserve this: + +- **Writes** change a cell only by adding an overlay, and that overlay's + `committed_version` exceeds the version of any pre-existing index — so the + change is always covered by an exclusion. +- **Compaction** may remove an overlay only if the index no longer relies on it + (see below). + +## Compaction + +Overlays accumulate read cost — every overlay is a bitmap to test and a possible +file to open. Compaction bounds that cost in two modes: + +- **Overlay → overlay.** Merge several overlays into fewer, computing the + post-image per `(offset, field)` by walking the merged overlays newest-first. + The merged overlay takes the **maximum** `committed_version` of its inputs, so + the exclusion semantics are preserved. Indexes are unaffected. This is cheap and + does not touch the base. +- **Overlay → base.** Fold overlays into a fresh base data file, computing the + post-image for every covered cell, then clear the overlays. The base is + complete, so every post-image is well defined. Overlay offsets are physical, so + they cannot survive a rewrite that reorders rows; folding therefore materializes + values rather than carrying overlays forward. + +!!! warning "Folding an indexed field must update its index" + + An overlay→base fold removes the overlay, which removes the exclusion signal + that kept an index correct. Folding an overlay that covers an indexed field + `F` is therefore equivalent to a column rewrite of `F` and must, in the same + commit, either rebuild the index to a `dataset_version` at least the folded + overlay's `committed_version`, or remove the fragment from the index's + coverage so the rows fall to the flat path. Otherwise the index would serve + stale values with no overlay to exclude them. This is the same rule that + already governs rewriting a column that an index is built on. + +When a fragment with overlays is compacted by a row-rewriting operation +(`RewriteRows`, which produces new fragments with new row addresses), the +overlays are folded into the new base as part of the rewrite, and existing +[fragment-reuse remapping](row_id_lineage.md) handles the row-address changes as +it does today. + +## Row lineage + +An overlay write updates the `last_updated_at_version` of every covered row, so +change-data-feed and time-travel queries observe the update. Because overlays are +addressed by physical offset, they do **not** require stable row IDs to be +enabled; lineage updates apply only when those features are on. + +## Worked example + +A table `users` with stable row IDs enabled and these fields: + +| field id | name | type | +|----------|-----------|-------------------------| +| 1 | id | `int32` (primary key) | +| 2 | name | `utf8` | +| 3 | age | `int32` | +| 4 | embedding | `fixed_size_list`| + +Created at version 1 as a single fragment `0` with one base data file +`data/file0.lance` holding all four columns. `physical_rows = 4`: + +| offset | id | name | age | embedding | +|--------|----|-------|-----|------------------| +| 0 | 1 | Alice | 30 | … | +| 1 | 2 | Bob | 25 | … | +| 2 | 3 | Carol | 40 | … | +| 3 | 4 | Dave | 22 | … | + +A BTree scalar index on `age` is built at version 1, covering fragment `0` +(`dataset_version = 1`). + +### Step 1 — write an overlay + +```sql +UPDATE users SET age = 26 WHERE id = 2; -- Bob, offset 1 +``` + +This touches one field (`age`) for one row, so the writer emits a dense overlay +and commits it as version 2. Fragment `0` gains: + +```text +DataOverlayFile { + data_file: { path: "data/overlay-.lance", fields: [3], column_indices: [0] } + coverage: shared_offset_bitmap = {1} + committed_version: 2 +} +``` + +The overlay file stores a single `age` column with one value, `[26]`, at +rank `{1}.rank(1) = 0`. `last_updated_at_version[1]` is set to 2. + +### Step 2 — read + +`SELECT id, age FROM users` reads base ages `[30, 25, 40, 22]`. For `age` +(field 3), the overlay covers offset 1, so `age[1]` is replaced with the overlay +value at position `{1}.rank(1) = 0` → `26`. Result ages: `[30, 26, 40, 22]`. + +### Step 3 — index query + +```sql +SELECT * FROM users WHERE age = 26; +``` + +The `age` index was built at `dataset_version = 1`; the overlay's +`committed_version` is 2. Since `2 > 1`, the overlay's coverage for `age`, `{1}`, +is the exclusion set for this query. + +- The index (built at v1) holds Bob's *old* `age = 25`, so a lookup for `26` + returns nothing from the index. +- Offset 1 is in the exclusion set, so it is re-evaluated on the flat path. Its + current `age` (26, via the overlay) matches `age = 26`, so Bob is returned. + +The mirror case `WHERE age = 25` shows exclusion preventing a stale hit: the index +returns offset 1 (stale `25`), but offset 1 is excluded, re-evaluated to `26`, and +correctly dropped. + +### Step 4 — a second, non-rectangular write + +```sql +MERGE INTO users USING staged ON users.id = staged.id +WHEN MATCHED AND staged.kind = 'rename' THEN UPDATE SET name = staged.name -- Carol(2), Dave(3) +WHEN MATCHED AND staged.kind = 'revec' THEN UPDATE SET embedding = staged.embedding -- Bob(1) +``` + +`name` is updated for offsets `{2, 3}` and `embedding` for offset `{1}` — different +fields over different rows. This is a sparse overlay, committed as version 3: + +```text +DataOverlayFile { + data_file: { path: "data/overlay-.lance", fields: [2, 4], column_indices: [0, 1] } + coverage: field_coverage { offset_bitmaps: [ {2,3}, {1} ] } + // name (field 2) ^ ^ embedding (field 4) + committed_version: 3 +} +``` + +The file's `name` column has **two** values (`["Caroline", "David"]`, at +ranks 0 and 1 of `{2,3}`) and its `embedding` column has **one** value (at rank 0 +of `{1}`) — columns of different lengths in one file. + +### Step 5 — read after the second write + +`SELECT name, age, embedding FROM users` resolves each field independently, +newest overlay first: + +- `name`: the v3 overlay covers `{2,3}` → `["Alice", "Bob", "Caroline", "David"]`. +- `age`: the v3 overlay does not cover `age`; the v2 overlay still applies at + offset 1 → `[30, 26, 40, 22]`. +- `embedding`: the v3 overlay covers `{1}` → Bob's vector is the new one, others + from base. + +Overlays from different versions coexist and apply per field. + +### Step 6 — compaction (overlay → base) + +The scheduler folds both overlays into fragment `0` at version 4, computing +post-images for `age`, `name`, and `embedding`, and writing a new base data file +`data/file1.lance` with those columns. In the old file, fields 2, 3, and 4 are +tombstoned (`-2`); field 1 (`id`) remains. The fragment's `overlays` list is +cleared. Row addresses are preserved (a column rewrite, not a row rewrite), so +stable row IDs and the deletion vector are untouched. + +Because the fold removed the overlay that was excluding offset 1 from the `age` +index, the same commit must reconcile that index: either rebuild it at +`dataset_version >= 2`, or drop fragment `0` from its coverage so `age` queries +fall to the flat path. After a rebuild at version 4, no overlay remains and the +`age` index directly returns `26` for Bob with no exclusion needed. + +## Guidance + +!!! note "This section is a stub." + + The following are implementation considerations, not part of the on-disk + specification. + +### When to overlay vs. rewrite a column vs. move rows + +*(To be expanded.)* The choice between appending an overlay, rewriting a full +column (data evolution), and moving updated rows to a new fragment depends on the +fraction of rows changed, the fraction of columns changed, column width, the +presence of indexes on the changed columns, and the accumulated overlay read +cost. Roughly: few rows changed favors overlays; most rows in a few columns +favors a column rewrite; most columns changed favors moving rows to a new +fragment. + +### Writer support + +*(To be expanded.)* Dense (rectangular) overlays write with the existing +equal-length file writer today. Sparse overlays stored as a **single** file +require the writer to emit columns of independent lengths, which the current v2 +writer does not yet do (it advances all columns from one global row counter). +Until that support lands, a writer can express a sparse update as multiple dense +overlays in one transaction. + +### Scheduling compaction + +*(To be expanded.)* The overlay→overlay and overlay→base modes have very +different costs; a cost/benefit scheduler decides when each is worthwhile, using +the version gap as a staleness signal. + +### Open questions + +*(To be resolved.)* + +- **Per-fragment vs. per-table overlays.** Overlays are attached per fragment. + Should there be a table-level overlay concept, and how would it interact with + fragment-level row addressing? +- **Relationship to LSM.** Overlays plus compaction resemble an LSM tree (newest + layer wins, periodic merge). How far should that analogy be taken, and what do + we deliberately do differently given Lance's random-access requirements? +- **Coverage bitmap spill.** Coverage bitmaps live inline in the manifest. Very + large coverage (an overlay touching many rows) may warrant external spill, as + the row-ID and last-updated-at sequences already do above a size threshold. + +## Related specifications + +- [Table format overview](index.md) +- [Transactions](transaction.md) +- [Row ID & Lineage](row_id_lineage.md) +- [Index Formats](../index/index.md) +- [Format Versioning](versioning.md) diff --git a/docs/src/format/table/index.md b/docs/src/format/table/index.md index 94ea4b90dc9..ce4d0b26613 100644 --- a/docs/src/format/table/index.md +++ b/docs/src/format/table/index.md @@ -168,6 +168,35 @@ However, this invalidates row addresses and requires rebuilding indices, which c +## Data Overlay Files + +!!! note "Overlay files require feature flag 64 (data overlay files)" + +Overlay files supply new values for a subset of `(row offset, field)` cells within +a fragment without rewriting the base data files. They make updates cheap when only +a small percentage of rows and/or columns change: a writer appends a small file +carrying just the changed cells instead of rewriting whole columns or moving rows +to a new fragment. + +On read, each cell is resolved by consulting the fragment's overlays from newest to +oldest; the first overlay covering that `(offset, field)` wins, otherwise the value +falls through to the base data file. Indices keep covering the fragment and reconcile +overlays at query time through a field-aware exclusion set. + +For the full specification — coverage and resolution rules, dense vs. sparse layout, +versioning, index integration, compaction, and a worked example — see the +[Data Overlay Files Specification](data_overlay_file.md). + +
+DataOverlayFile protobuf message + +```protobuf +%%% proto.message.DataOverlayFile %%% +``` + +
+ + ## Related Specifications ### Storage Layout diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..cc8b477a6a6 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -113,6 +113,11 @@ message Manifest { // * 2: row ids are stable and stored as part of the fragment metadata. // * 4: use v2 format (deprecated) // * 8: table config is present + // * 16: data files use multiple base paths (shallow clone / multi-base) + // * 32: the transaction file under _transactions is not written (inline only) + // * 64: data overlay files are present (see DataOverlayFile). Readers that do + // not understand overlays must refuse the dataset, since ignoring an overlay + // would silently return stale base values. uint64 reader_feature_flags = 9; // Feature flags for writers. @@ -311,6 +316,15 @@ message DataFragment { repeated DataFile files = 2; + // Optional overlay files for this fragment, which supply new values for a + // subset of cells without rewriting the base data files. This MUST be empty + // if the data overlay files feature flag (64) is not set in the manifest. + // + // Order is significant: a later entry is newer than an earlier one. When two + // overlays cover the same (offset, field) and share a `committed_version`, the + // later entry wins. See DataOverlayFile for the full resolution rules. + repeated DataOverlayFile overlays = 11; + // File that indicates which rows, if any, should be considered deleted. DeletionFile deletion_file = 3; @@ -433,6 +447,66 @@ message DataFile { optional uint32 base_id = 7; } // DataFile +// An overlay file supplies new values for a subset of (row offset, field) cells +// within a fragment, without rewriting the fragment's base data files. It is +// used for efficient updates when only a small fraction of rows and/or columns +// change. +// +// On read, a cell is resolved by consulting the fragment's overlays from newest +// to oldest: the first overlay that covers that (offset, field) wins; if none +// cover it, the value falls through to the base data file. Because deletions +// take precedence over overlays, an overlay value for an offset that is also +// marked deleted is dead and is ignored. +// +// The overlay's data file does NOT store a row-offset key column. Within a value +// column, the position of a covered offset's value is the rank (0-based count of +// set bits below it) of that offset within the field's coverage bitmap. Because +// fields may cover different offset sets, the value columns of a single overlay +// data file may have different lengths (which the Lance file format permits). +message DataOverlayFile { + // The data file storing the overlay's new cell values, one value column per + // field in `data_file.fields`. No row-offset key column is stored. + DataFile data_file = 1; + + // Which (offset, field) cells this overlay provides values for. + oneof coverage { + // A single 32-bit Roaring bitmap of physical row offsets that applies to + // every field in `data_file.fields` (a "dense" / rectangular overlay). + // Every covered offset has a value for every field. This is the common case + // for a plain UPDATE, where one SET list is applied to one set of rows. + bytes shared_offset_bitmap = 2; + // Per-field coverage for a "sparse" overlay, used when different fields cover + // different offset sets (e.g. a MERGE with multiple WHEN MATCHED branches). + FieldCoverage field_coverage = 4; + } + + // The dataset version at which this overlay became effective: the version of + // the commit that introduced it, NOT the version it was read from. It is + // stamped at commit time and re-stamped if the commit is retried, in the same + // way as the created-at / last-updated-at version sequences. + // + // This drives two orderings: + // * Versus index builds: an index whose `dataset_version` >= this value + // already incorporates this overlay. Otherwise the overlay's covered cells + // are excluded from index results for the affected fields and re-evaluated + // against their current values (see the Data Overlay Files specification). + // * Versus other overlays: when two overlays cover the same (offset, field), + // the one with the higher `committed_version` wins. Overlays that share a + // `committed_version` are ordered by their position in + // `DataFragment.overlays`, where a later entry is newer and wins. + uint64 committed_version = 3; +} + +// Per-field coverage for a sparse overlay. +message FieldCoverage { + // One entry per field in the overlay's `data_file.fields`, in the same order. + // Each is a 32-bit Roaring bitmap of the physical row offsets covered for that + // field. An offset present in a field's bitmap but mapped to a NULL value + // means the cell is overridden to NULL (distinct from an offset that is absent, + // which falls through to the base data file). + repeated bytes offset_bitmaps = 1; +} + // Deletion File // // The path of the deletion file is constructed as: diff --git a/protos/transaction.proto b/protos/transaction.proto index e72e95025a4..bfc0eee354b 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -315,6 +315,44 @@ message Transaction { repeated DataReplacementGroup replacements = 1; } + // Overlay files to append to a single fragment, in order (the last entry is + // newest). The overlays are appended to the fragment's existing `overlays` + // list; they do not replace it, so overlays written by concurrent commits are + // preserved. + message DataOverlayGroup { + uint64 fragment_id = 1; + // Each DataOverlayFile.committed_version is left 0 by the writer and stamped + // to the new dataset version at commit time (re-stamped on retry), in the + // same way as the created-at / last-updated-at version sequences. The fields + // touched are read from each overlay's `data_file.fields`. + repeated DataOverlayFile overlays = 2; + } + + // Attach overlay files to fragments, supplying new values for a subset of + // (row offset, field) cells without rewriting the fragments' base data files. + // See the DataOverlayFile message in table.proto and the Data Overlay Files + // specification for resolution, coverage, and versioning rules. + // + // Conflict semantics (intentionally permissive, like DataReplacement). Against + // a concurrent operation that touches one of the same fragments: + // * Another DataOverlay (any fields): COMPATIBLE. Overlays stack; when two + // overlays cover the same (offset, field) the one with the higher + // `committed_version` wins, so independent backfills never conflict. + // * Append / new fragments: COMPATIBLE. + // * Delete: COMPATIBLE. A deletion takes precedence over an overlay, so an + // overlay value for a deleted offset is inert (no special handling needed). + // * DataReplacement or column-rewrite (Update with REWRITE_COLUMNS) of the + // same field: COMPATIBLE. Both preserve physical row addresses, so overlay + // offsets stay valid; the overlay is newer and wins its covered cells, and + // the version gate excludes those cells from any rebuilt index. + // * Row-rewrite, compaction, or an overlay->base fold of the fragment: + // CONFLICT. These change physical row addresses or consume the overlays, so + // the overlay's offsets are no longer valid. The writer must re-read the new + // fragment, recompute, and retry. + message DataOverlay { + repeated DataOverlayGroup groups = 1; + } + // Update the merged generations in MemWAL index. // This operation is used during merge-insert to atomically record which // generations have been merged to the base table. @@ -346,6 +384,7 @@ message Transaction { UpdateMemWalState update_mem_wal_state = 112; Clone clone = 113; UpdateBases update_bases = 114; + DataOverlay data_overlay = 115; } // Fields 200/202 (`blob_append` / `blob_overwrite`) previously represented blob dataset ops. diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index c454f73819e..45d50541879 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -491,6 +491,21 @@ impl FileReader { self.num_rows } + /// The number of rows stored in a single physical column. + /// + /// For ordinary (rectangular) files every column has the same length, equal + /// to [`num_rows`](Self::num_rows). Files written with + /// [`FileWriter::write_columns`](crate::writer::FileWriter::write_columns) + /// may have columns of differing lengths; this returns the length of one + /// such column, derived by summing its pages' row counts. Returns `None` if + /// `column_index` is out of bounds. + pub fn column_num_rows(&self, column_index: usize) -> Option { + self.metadata + .column_metadatas + .get(column_index) + .map(|col| col.pages.iter().map(|page| page.length).sum()) + } + pub fn metadata(&self) -> &Arc { &self.metadata } diff --git a/rust/lance-file/src/writer.rs b/rust/lance-file/src/writer.rs index 12bd50df6fe..63ff7a95314 100644 --- a/rust/lance-file/src/writer.rs +++ b/rust/lance-file/src/writer.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; -use arrow_array::RecordBatch; +use arrow_array::{ArrayRef, RecordBatch}; use arrow_data::ArrayData; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -221,6 +221,11 @@ pub struct FileWriter { field_id_to_column_indices: Vec<(u32, u32)>, num_columns: u32, rows_written: u64, + // The number of rows written for each top-level field (i.e. each entry in + // `column_writers`). With `write_batch` every field advances together and + // these are all equal, but `write_columns` advances fields independently, so + // a single file may end up with columns of differing item counts. + field_rows_written: Vec, global_buffers: Vec<(u64, u64)>, schema_metadata: HashMap, options: FileWriterOptions, @@ -277,6 +282,7 @@ impl FileWriter { column_metadata: Vec::new(), num_columns: 0, rows_written: 0, + field_rows_written: Vec::new(), field_id_to_column_indices: Vec::new(), global_buffers: Vec::new(), schema_metadata: HashMap::new(), @@ -467,6 +473,7 @@ impl FileWriter { BatchEncoder::try_new(&schema, encoding_strategy.as_ref(), &encoding_options)?; self.num_columns = encoder.num_columns(); + self.field_rows_written = vec![0; encoder.field_encoders.len()]; self.column_writers = encoder.field_encoders; self.column_metadata = vec![initial_column_metadata(); self.num_columns as usize]; self.field_id_to_column_indices = encoder.field_id_to_column_index; @@ -490,13 +497,14 @@ impl FileWriter { batch: &RecordBatch, external_buffers: &mut OutOfLineBuffers, ) -> Result>> { - self.schema + let items = self + .schema .as_ref() .unwrap() .fields .iter() - .zip(self.column_writers.iter_mut()) - .map(|(field, column_writer)| { + .enumerate() + .map(|(field_idx, field)| { let array = batch .column_by_name(&field.name) @@ -507,19 +515,53 @@ impl FileWriter { ) .into(), ))?; + Ok((field_idx, array.clone())) + }) + .collect::>>()?; + self.encode_columns(&items, external_buffers) + } + + /// Encode a set of `(field index, array)` pairs, each advancing only its own + /// column. The returned tasks must be written before the per-field row + /// counters are advanced (see `advance_columns`). + fn encode_columns( + &mut self, + items: &[(usize, ArrayRef)], + external_buffers: &mut OutOfLineBuffers, + ) -> Result>> { + // Snapshot the starting row number of each field before borrowing the + // column writers mutably below. + let row_numbers = items + .iter() + .map(|(field_idx, _)| self.field_rows_written[*field_idx]) + .collect::>(); + items + .iter() + .zip(row_numbers) + .map(|((field_idx, array), row_number)| { let repdef = RepDefBuilder::default(); let num_rows = array.len() as u64; - column_writer.maybe_encode( + self.column_writers[*field_idx].maybe_encode( array.clone(), external_buffers, repdef, - self.rows_written, + row_number, num_rows, ) }) .collect::>>() } + /// Advance the per-field row counters after a set of columns has been + /// written, keeping `rows_written` (the file's logical length) in sync as the + /// maximum column length. + fn advance_columns(&mut self, items: &[(usize, ArrayRef)]) { + for (field_idx, array) in items { + self.field_rows_written[*field_idx] += array.len() as u64; + } + self.rows_written = self.field_rows_written.iter().copied().max().unwrap_or(0); + } + /// Schedule a batch of data to be written to the file /// /// Note: the future returned by this method may complete before the data has been fully @@ -557,18 +599,100 @@ impl FileWriter { .flatten() .collect::>(); - self.rows_written = match self.rows_written.checked_add(batch.num_rows() as u64) { - Some(rows_written) => rows_written, - None => { - return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into())); - } - }; + // `write_batch` advances every field by the same amount, keeping all + // columns equal length. Guard against overflowing the row counter. + if self.rows_written.checked_add(num_rows).is_none() { + return Err(Error::invalid_input_source(format!("cannot write batch with {} rows because {} rows have already been written and Lance files cannot contain more than 2^64 rows", num_rows, self.rows_written).into())); + } + for field_rows in self.field_rows_written.iter_mut() { + *field_rows += num_rows; + } + self.rows_written = self.field_rows_written.iter().copied().max().unwrap_or(0); self.write_pages(encoding_tasks).await?; Ok(()) } + /// Write a set of columns whose lengths may differ from one another. + /// + /// Unlike [`write_batch`](Self::write_batch), which advances every column + /// from a single shared row counter, this method advances each column + /// independently. The result is a single file whose columns may have + /// different item counts — the physical layout used by sparse data overlay + /// files, where each field covers a different set of rows. + /// + /// `columns` is a list of `(field index, array)` pairs, where the field + /// index refers to a top-level field in the writer's schema (the same order + /// as the schema's fields). A field may be written across multiple calls; + /// its values are appended. A field that is never written ends up as a + /// zero-length column. The writer must have been created with an explicit + /// schema (via [`try_new`](Self::try_new)); a lazy schema cannot be inferred + /// here because individual calls need not cover every field. + /// + /// ``` + /// # use arrow_array::{ArrayRef, Int32Array}; + /// # use std::sync::Arc; + /// # use lance_file::writer::FileWriter; + /// # async fn example(writer: &mut FileWriter) -> lance_core::Result<()> { + /// // Field 0 gets three values, field 1 gets one — a non-rectangular file. + /// let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + /// let b: ArrayRef = Arc::new(Int32Array::from(vec![10])); + /// writer.write_columns(vec![(0, a), (1, b)]).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn write_columns(&mut self, columns: Vec<(usize, ArrayRef)>) -> Result<()> { + let schema = self.schema.as_ref().ok_or_else(|| { + Error::invalid_input_source( + "write_columns requires the writer to be created with an explicit schema".into(), + ) + })?; + // Validate field indices, lengths, and nullability up front. + for (field_idx, array) in &columns { + let field = schema.fields.get(*field_idx).ok_or_else(|| { + Error::invalid_input_source( + format!( + "write_columns: field index {} is out of bounds (schema has {} fields)", + field_idx, + schema.fields.len() + ) + .into(), + ) + })?; + if array.len() as u64 > u32::MAX as u64 { + return Err(Error::invalid_input_source( + "cannot write Lance files with more than 2^32 rows".into(), + )); + } + Self::verify_field_nullability(&array.to_data(), field)?; + } + // Skip empty arrays: a never-advanced field simply remains a zero-length + // column, which the encoders handle at `finish` time. + let columns = columns + .into_iter() + .filter(|(_, array)| !array.is_empty()) + .collect::>(); + if columns.is_empty() { + return Ok(()); + } + + let mut external_buffers = + OutOfLineBuffers::new(self.tell().await?, PAGE_BUFFER_ALIGNMENT as u64); + let encoding_tasks = self.encode_columns(&columns, &mut external_buffers)?; + for external_buffer in external_buffers.take_buffers() { + Self::do_write_buffer(&mut self.writer, &external_buffer).await?; + } + let encoding_tasks = encoding_tasks + .into_iter() + .flatten() + .collect::>(); + + self.advance_columns(&columns); + self.write_pages(encoding_tasks).await?; + Ok(()) + } + async fn write_column_metadata( &mut self, metadata: pbfile::ColumnMetadata, @@ -974,11 +1098,11 @@ mod tests { use std::collections::HashMap; use std::sync::Arc; - use crate::reader::{FileReader, FileReaderOptions, describe_encoding}; + use crate::reader::{FileReader, FileReaderOptions, ReaderProjection, describe_encoding}; use crate::testing::FsFixture; use crate::writer::{ENV_LANCE_FILE_WRITER_MAX_PAGE_BYTES, FileWriter, FileWriterOptions}; use arrow_array::builder::{Float32Builder, Int32Builder}; - use arrow_array::{Int32Array, RecordBatch, UInt64Array}; + use arrow_array::{ArrayRef, Int32Array, RecordBatch, UInt64Array}; use arrow_array::{RecordBatchReader, StringArray, types::Float64Type}; use arrow_schema::{DataType, Field, Field as ArrowField, Schema, Schema as ArrowSchema}; use lance_core::cache::LanceCache; @@ -990,6 +1114,7 @@ mod tests { use lance_encoding::version::LanceFileVersion; use lance_io::object_store::ObjectStore; use lance_io::utils::CachedFileSize; + use rstest::rstest; #[tokio::test] async fn test_basic_write() { @@ -1040,6 +1165,196 @@ mod tests { file_writer.finish().await.unwrap(); } + /// Read a single column back at an explicit range/index set, returning its + /// `Int32` values. Reading one column at a time is how unequal-length files + /// are consumed: a global full-scan would conflate columns of different + /// lengths into one (impossible) rectangular batch. + async fn read_int32_column( + reader: &FileReader, + schema: &LanceSchema, + version: LanceFileVersion, + name: &str, + params: lance_io::ReadBatchParams, + ) -> Vec> { + use futures::TryStreamExt; + use lance_encoding::decoder::FilterExpression; + + let projection = ReaderProjection::from_column_names(version, schema, &[name]).unwrap(); + let batches: Vec = reader + .read_stream_projected(params, 1024, 16, projection, FilterExpression::no_filter()) + .await + .unwrap() + .try_collect() + .await + .unwrap(); + batches + .iter() + .flat_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .collect::>() + }) + .collect() + } + + /// A single file may hold columns of differing item counts (no shared global + /// row counter). This is the physical layout used by sparse data overlay + /// files, where each field covers a different set of rows. + #[rstest] + #[tokio::test] + async fn test_write_columns_unequal_lengths( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + use lance_io::ReadBatchParams; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ArrowField::new("c", DataType::Int32, true), + ])); + let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap(); + + let fs = FsFixture::default(); + let options = FileWriterOptions { + format_version: Some(version), + ..Default::default() + }; + let mut writer = FileWriter::try_new( + fs.object_store.create(&fs.tmp_path).await.unwrap(), + lance_schema.clone(), + options, + ) + .unwrap(); + + // Field "a" gets 5 values across two calls (appending), field "b" gets a + // single value, and field "c" is never written (a zero-length column). + let a1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let b: ArrayRef = Arc::new(Int32Array::from(vec![10])); + writer.write_columns(vec![(0, a1), (1, b)]).await.unwrap(); + let a2: ArrayRef = Arc::new(Int32Array::from(vec![4, 5])); + // A zero-length array for an otherwise-unwritten field is a no-op. + let c_empty: ArrayRef = Arc::new(Int32Array::from(Vec::::new())); + writer + .write_columns(vec![(0, a2), (2, c_empty)]) + .await + .unwrap(); + + let summary = writer.finish().await.unwrap(); + // The file's logical length is the longest column. + assert_eq!(summary.num_rows, 5); + + let file_scheduler = fs + .scheduler + .open_file(&fs.tmp_path, &CachedFileSize::unknown()) + .await + .unwrap(); + let reader = FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await + .unwrap(); + + // Per-column row counts are recorded in / derivable from file metadata. + assert_eq!(reader.num_rows(), 5); + assert_eq!(reader.column_num_rows(0), Some(5)); + assert_eq!(reader.column_num_rows(1), Some(1)); + assert_eq!(reader.column_num_rows(2), Some(0)); + assert_eq!(reader.column_num_rows(3), None); + + // Each column reads back independently at its own length. + assert_eq!( + read_int32_column( + &reader, + &lance_schema, + version, + "a", + ReadBatchParams::Range(0..5) + ) + .await, + vec![Some(1), Some(2), Some(3), Some(4), Some(5)], + ); + assert_eq!( + read_int32_column( + &reader, + &lance_schema, + version, + "b", + ReadBatchParams::Range(0..1) + ) + .await, + vec![Some(10)], + ); + + // Random access by position within the longer column returns the right + // value even though other columns are shorter. (The take path requires + // strictly increasing indices.) + assert_eq!( + read_int32_column( + &reader, + &lance_schema, + version, + "a", + ReadBatchParams::Indices(arrow_array::UInt32Array::from(vec![0, 2, 4])), + ) + .await, + vec![Some(1), Some(3), Some(5)], + ); + } + + /// Files written the ordinary (rectangular) way keep equal column lengths, + /// so the unequal-length support is backwards compatible. + #[tokio::test] + async fn test_write_batch_keeps_equal_lengths() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, true), + ArrowField::new("b", DataType::Int32, true), + ])); + let lance_schema = LanceSchema::try_from(arrow_schema.as_ref()).unwrap(); + + let fs = FsFixture::default(); + let mut writer = FileWriter::try_new( + fs.object_store.create(&fs.tmp_path).await.unwrap(), + lance_schema, + FileWriterOptions::default(), + ) + .unwrap(); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + ) + .unwrap(); + writer.write_batch(&batch).await.unwrap(); + let summary = writer.finish().await.unwrap(); + assert_eq!(summary.num_rows, 3); + + let file_scheduler = fs + .scheduler + .open_file(&fs.tmp_path, &CachedFileSize::unknown()) + .await + .unwrap(); + let reader = FileReader::try_open( + file_scheduler, + None, + Arc::::default(), + &LanceCache::no_cache(), + FileReaderOptions::default(), + ) + .await + .unwrap(); + assert_eq!(reader.column_num_rows(0), Some(3)); + assert_eq!(reader.column_num_rows(1), Some(3)); + } + #[tokio::test] async fn test_max_page_bytes_enforced() { let arrow_field = Field::new("data", DataType::UInt64, false); diff --git a/rust/lance-table/benches/manifest_intern.rs b/rust/lance-table/benches/manifest_intern.rs index 78b7e352207..81bd57c1a22 100644 --- a/rust/lance-table/benches/manifest_intern.rs +++ b/rust/lance-table/benches/manifest_intern.rs @@ -59,6 +59,7 @@ fn make_uniform_pb_fragments(n: u64, num_fields: usize) -> Vec file_size_bytes: 0, base_id: None, }], + overlays: vec![], deletion_file: None, row_id_sequence: None, physical_rows: 1000, @@ -135,6 +136,7 @@ fn make_diverse_pb_fragments( file_size_bytes: 0, base_id: None, }], + overlays: vec![], deletion_file: None, row_id_sequence: None, physical_rows: 1000, diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index 096f0da79e5..1e0be5a3d06 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -20,8 +20,13 @@ pub const FLAG_TABLE_CONFIG: u64 = 8; pub const FLAG_BASE_PATHS: u64 = 16; /// Disable writing transaction file under _transaction/, this flag is set when we only want to write inline transaction in manifest pub const FLAG_DISABLE_TRANSACTION_FILE: u64 = 32; +/// Fragments contain data overlay files, which supply new values for a subset of +/// cells without rewriting base data files. A reader that does not understand +/// overlays must refuse the dataset, since ignoring an overlay would silently +/// return stale base values. +pub const FLAG_DATA_OVERLAY_FILES: u64 = 64; /// The first bit that is unknown as a feature flag -pub const FLAG_UNKNOWN: u64 = 64; +pub const FLAG_UNKNOWN: u64 = 128; /// Set the reader and writer feature flags in the manifest based on the contents of the manifest. pub fn apply_feature_flags( @@ -71,6 +76,18 @@ pub fn apply_feature_flags( manifest.writer_feature_flags |= FLAG_BASE_PATHS; } + // Overlay files change cell values on read, so a reader that ignores them + // would return stale base values. Both readers and writers must understand + // them. + let has_overlays = manifest + .fragments + .iter() + .any(|frag| !frag.overlays.is_empty()); + if has_overlays { + manifest.reader_feature_flags |= FLAG_DATA_OVERLAY_FILES; + manifest.writer_feature_flags |= FLAG_DATA_OVERLAY_FILES; + } + if disable_transaction_file { manifest.writer_feature_flags |= FLAG_DISABLE_TRANSACTION_FILE; } @@ -103,6 +120,7 @@ mod tests { assert!(can_read_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_read_dataset(super::FLAG_BASE_PATHS)); assert!(can_read_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_read_dataset(super::FLAG_DATA_OVERLAY_FILES)); assert!(can_read_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS @@ -120,12 +138,14 @@ mod tests { assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_write_dataset(super::FLAG_BASE_PATHS)); assert!(can_write_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_write_dataset(super::FLAG_DATA_OVERLAY_FILES)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED | super::FLAG_TABLE_CONFIG | super::FLAG_BASE_PATHS + | super::FLAG_DATA_OVERLAY_FILES )); assert!(!can_write_dataset(super::FLAG_UNKNOWN)); } diff --git a/rust/lance-table/src/format/fragment.rs b/rust/lance-table/src/format/fragment.rs index 431e466dbd4..6b0721cf75d 100644 --- a/rust/lance-table/src/format/fragment.rs +++ b/rust/lance-table/src/format/fragment.rs @@ -11,6 +11,7 @@ use lance_file::format::{MAJOR_VERSION, MINOR_VERSION}; use lance_file::version::LanceFileVersion; use lance_io::utils::CachedFileSize; use object_store::path::Path; +use roaring::RoaringBitmap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::format::pb; @@ -233,6 +234,145 @@ impl TryFrom for DataFile { } } +/// Which `(physical offset, field)` cells a [`DataOverlayFile`] provides values +/// for. +/// +/// The coverage bitmaps index **physical** row offsets (positions in the base +/// data files, counting deleted rows), so they are stable across deletions, like +/// deletion vectors. Bitmaps are stored as serialized 32-bit Roaring bitmaps; use +/// [`DataOverlayFile::coverage_for_field`] to obtain the parsed bitmap that +/// applies to a given field. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] +pub enum OverlayCoverage { + /// A single bitmap that applies to every field in the overlay's + /// `data_file.fields` (a dense / rectangular overlay): every covered offset + /// has a value for every field. + Shared(Vec), + /// One bitmap per field, in the same order as the overlay's + /// `data_file.fields` (a sparse overlay): different fields may cover + /// different offset sets. + PerField(Vec>), +} + +fn deserialize_roaring(bytes: &[u8]) -> Result { + RoaringBitmap::deserialize_from(bytes).map_err(|e| { + Error::invalid_input(format!( + "failed to deserialize overlay coverage bitmap: {e}" + )) + }) +} + +fn serialize_roaring(bitmap: &RoaringBitmap) -> Vec { + let mut bytes = Vec::with_capacity(bitmap.serialized_size()); + // Writing to a Vec is infallible. + bitmap.serialize_into(&mut bytes).unwrap(); + bytes +} + +impl OverlayCoverage { + /// Build a dense coverage from a single bitmap. + pub fn dense(bitmap: &RoaringBitmap) -> Self { + Self::Shared(serialize_roaring(bitmap)) + } + + /// Build a sparse coverage from one bitmap per field. + pub fn sparse(bitmaps: &[RoaringBitmap]) -> Self { + Self::PerField(bitmaps.iter().map(serialize_roaring).collect()) + } +} + +/// An overlay file supplies new values for a subset of `(physical offset, field)` +/// cells within a fragment, without rewriting the fragment's base data files. See +/// the Data Overlay Files specification for the full resolution, coverage, and +/// versioning rules. +/// +/// The overlay's `data_file` stores one value column per field in +/// `data_file.fields`, with **no** row-offset key column. Within a value column, +/// the position of a covered offset's value is the **rank** (0-based count of set +/// bits below it) of that offset in the field's coverage bitmap. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)] +pub struct DataOverlayFile { + /// The data file storing the overlay's new cell values. + pub data_file: DataFile, + /// Which cells this overlay provides values for. + pub coverage: OverlayCoverage, + /// The dataset version at which this overlay became effective (the version of + /// the commit that introduced it, stamped at commit time and re-stamped on + /// retry). Higher wins when two overlays cover the same `(offset, field)`. + pub committed_version: u64, +} + +impl DataOverlayFile { + /// The parsed coverage bitmap that applies to the field stored at + /// `field_pos` within `data_file.fields`. + /// + /// For a dense overlay the same shared bitmap is returned for every field; + /// for a sparse overlay the per-field bitmap at `field_pos` is returned. + pub fn coverage_for_field(&self, field_pos: usize) -> Result { + match &self.coverage { + OverlayCoverage::Shared(bytes) => deserialize_roaring(bytes), + OverlayCoverage::PerField(bitmaps) => { + let bytes = bitmaps.get(field_pos).ok_or_else(|| { + Error::invalid_input(format!( + "overlay field_coverage has {} bitmaps but field position {} was requested", + bitmaps.len(), + field_pos + )) + })?; + deserialize_roaring(bytes) + } + } + } +} + +impl From<&DataOverlayFile> for pb::DataOverlayFile { + fn from(overlay: &DataOverlayFile) -> Self { + let coverage = match &overlay.coverage { + OverlayCoverage::Shared(bytes) => { + pb::data_overlay_file::Coverage::SharedOffsetBitmap(bytes.clone()) + } + OverlayCoverage::PerField(bitmaps) => { + pb::data_overlay_file::Coverage::FieldCoverage(pb::FieldCoverage { + offset_bitmaps: bitmaps.clone(), + }) + } + }; + Self { + data_file: Some(pb::DataFile::from(&overlay.data_file)), + coverage: Some(coverage), + committed_version: overlay.committed_version, + } + } +} + +impl TryFrom for DataOverlayFile { + type Error = Error; + + fn try_from(proto: pb::DataOverlayFile) -> Result { + let data_file = proto + .data_file + .ok_or_else(|| Error::invalid_input("DataOverlayFile is missing its data_file"))?; + let coverage = match proto.coverage { + Some(pb::data_overlay_file::Coverage::SharedOffsetBitmap(bytes)) => { + OverlayCoverage::Shared(bytes) + } + Some(pb::data_overlay_file::Coverage::FieldCoverage(fc)) => { + OverlayCoverage::PerField(fc.offset_bitmaps) + } + None => { + return Err(Error::invalid_input( + "DataOverlayFile is missing its coverage", + )); + } + }; + Ok(Self { + data_file: DataFile::try_from(data_file)?, + coverage, + committed_version: proto.committed_version, + }) + } +} + /// Interns repeated data so that fragments with identical content share a /// single heap allocation via `Arc`. /// @@ -375,6 +515,11 @@ impl DataFileFieldInterner { .into_iter() .map(|f| self.intern_data_file(f)) .collect::>()?, + overlays: p + .overlays + .into_iter() + .map(DataOverlayFile::try_from) + .collect::>()?, deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?, row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?, physical_rows, @@ -483,6 +628,12 @@ pub struct Fragment { /// Files within the fragment. pub files: Vec, + /// Overlay files supplying new values for a subset of cells without + /// rewriting the base data files. Order is significant: a later entry is + /// newer than an earlier one. See [`DataOverlayFile`] for resolution rules. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub overlays: Vec, + /// Optional file with deleted local row offsets. #[serde(skip_serializing_if = "Option::is_none")] pub deletion_file: Option, @@ -510,6 +661,7 @@ impl Fragment { Self { id, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -549,6 +701,7 @@ impl Fragment { Self { id, files: vec![DataFile::new_legacy(path, schema, None, None)], + overlays: vec![], deletion_file: None, physical_rows, row_id_meta: None, @@ -669,6 +822,11 @@ impl TryFrom for Fragment { .into_iter() .map(DataFile::try_from) .collect::>()?, + overlays: p + .overlays + .into_iter() + .map(DataOverlayFile::try_from) + .collect::>()?, deletion_file: p.deletion_file.map(DeletionFile::try_from).transpose()?, row_id_meta: p.row_id_sequence.map(RowIdMeta::try_from).transpose()?, physical_rows, @@ -716,6 +874,7 @@ impl From<&Fragment> for pb::DataFragment { Self { id: f.id, files: f.files.iter().map(pb::DataFile::from).collect(), + overlays: f.overlays.iter().map(pb::DataOverlayFile::from).collect(), deletion_file, row_id_sequence, physical_rows: f.physical_rows.unwrap_or_default() as u64, @@ -734,6 +893,65 @@ mod tests { use object_store::path::Path; use serde_json::{Value, json}; + #[test] + fn test_data_overlay_roundtrip() { + // A fragment carrying a dense overlay round-trips through protobuf and + // back, and the parsed coverage bitmap is recovered per field. + let mut bitmap = RoaringBitmap::new(); + bitmap.insert(1); + bitmap.insert(3); + + let overlay = DataOverlayFile { + data_file: DataFile::new_legacy_from_fields("overlay-0.lance", vec![3], None), + coverage: OverlayCoverage::dense(&bitmap), + committed_version: 7, + }; + let mut fragment = Fragment::new(0); + fragment.files = vec![DataFile::new_legacy_from_fields( + "base.lance", + vec![1, 3], + None, + )]; + fragment.overlays = vec![overlay]; + + let proto = pb::DataFragment::from(&fragment); + assert_eq!(proto.overlays.len(), 1); + let round_tripped = Fragment::try_from(proto).unwrap(); + assert_eq!(round_tripped, fragment); + + // Dense coverage applies to every field. + let recovered = round_tripped.overlays[0].coverage_for_field(0).unwrap(); + assert_eq!(recovered, bitmap); + assert_eq!( + round_tripped.overlays[0].coverage_for_field(5).unwrap(), + bitmap + ); + } + + #[test] + fn test_data_overlay_sparse_per_field_coverage() { + // A sparse overlay carries one bitmap per field, recovered by position. + let name_coverage = RoaringBitmap::from_iter([2u32, 3]); + let embedding_coverage = RoaringBitmap::from_iter([1u32]); + let overlay = DataOverlayFile { + data_file: DataFile::new_legacy_from_fields("overlay-1.lance", vec![2, 4], None), + coverage: OverlayCoverage::sparse(&[name_coverage.clone(), embedding_coverage.clone()]), + committed_version: 3, + }; + let mut fragment = Fragment::new(1); + fragment.overlays = vec![overlay]; + + let round_tripped = Fragment::try_from(pb::DataFragment::from(&fragment)).unwrap(); + assert_eq!( + round_tripped.overlays[0].coverage_for_field(0).unwrap(), + name_coverage + ); + assert_eq!( + round_tripped.overlays[0].coverage_for_field(1).unwrap(), + embedding_coverage + ); + } + #[test] fn test_new_fragment() { let path = "foobar.lance"; diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 9845061b7e4..cd0a403621f 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -1316,6 +1316,7 @@ mod tests { vec![0, 1, 2], None, )], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1328,6 +1329,7 @@ mod tests { DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None), DataFile::new_legacy_from_fields("path3", vec![2], None), ], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3e0d77704da..448feb961d7 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -79,6 +79,7 @@ pub mod index; pub mod mem_wal; mod metadata; pub mod optimize; +pub(crate) mod overlay; pub mod progress; pub mod refs; pub(crate) mod rowids; diff --git a/rust/lance/src/dataset/files.rs b/rust/lance/src/dataset/files.rs index 848add7e4a8..2214822ed90 100644 --- a/rust/lance/src/dataset/files.rs +++ b/rust/lance/src/dataset/files.rs @@ -1036,6 +1036,7 @@ mod tests { // No base_id -> falls back to the dataset base_uri. mk_file("c.lance", None), ], + overlays: vec![], // Deletion files also carry a base_id when they originate from a // shallow clone, and must resolve against base_paths too. deletion_file: Some(DeletionFile { diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index eb165e5f612..02787863c8b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -15,7 +15,8 @@ use arrow::compute::concat_batches; use arrow_array::cast::as_primitive_array; use arrow_array::types::UInt64Type; use arrow_array::{ - Array, RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, new_null_array, + Array, ArrayRef, RecordBatch, RecordBatchReader, StructArray, UInt32Array, UInt64Array, + new_null_array, }; use arrow_schema::Schema as ArrowSchema; use datafusion::logical_expr::Expr; @@ -43,7 +44,7 @@ use lance_file::{LanceEncodingsIo, determine_file_version}; use lance_io::ReadBatchParams; use lance_io::scheduler::{FileScheduler, ScanScheduler, SchedulerConfig}; use lance_io::utils::CachedFileSize; -use lance_table::format::{DataFile, DeletionFile, Fragment}; +use lance_table::format::{DataFile, DataOverlayFile, DeletionFile, Fragment}; use lance_table::io::deletion::{deletion_file_path, write_deletion_file}; use lance_table::rowids::RowIdSequence; use lance_table::utils::stream::{ @@ -62,6 +63,9 @@ use super::updater::Updater; use super::{NewColumnTransform, WriteParams, schema_evolution}; use crate::dataset::Dataset; use crate::dataset::fragment::session::FragmentSession; +use crate::dataset::overlay::{ + FieldOverlayPlan, ResolvedFieldOverlay, apply_overlays_to_batch, overlay_indices_newest_first, +}; use crate::io::deletion::read_dataset_deletion_file; /// Result of [`FileFragment::update_columns_with_offsets`]: updated fragment metadata, modified field ids, @@ -938,6 +942,11 @@ impl FileFragment { Arc::new(self.metadata.clone()), )?; + if !self.metadata.overlays.is_empty() { + reader.overlay_plans = + Arc::new(self.load_overlay_plans(projection, &read_config).await?); + } + if read_config.with_row_id { reader.with_row_id(); } @@ -1113,6 +1122,92 @@ impl FileFragment { Ok(opened_files) } + /// Load the overlay value columns for the projected fields, ordered + /// newest-first, ready to be merged into base batches on read. + /// + /// For each projected (top-level) field, the fragment's overlays are walked + /// newest-first; an overlay contributes if its `data_file.fields` includes + /// the field, in which case the field's coverage bitmap and full value + /// column are loaded. Overlays on nested (non-top-level) fields are not yet + /// supported and are simply not matched here. + async fn load_overlay_plans( + &self, + projection: &Schema, + read_config: &FragReadConfig, + ) -> Result> { + let order = overlay_indices_newest_first(&self.metadata.overlays); + let mut plans = Vec::new(); + for field in &projection.fields { + let mut resolved = Vec::new(); + for &overlay_idx in &order { + let overlay = &self.metadata.overlays[overlay_idx]; + let Some(field_pos) = overlay + .data_file + .fields + .iter() + .position(|&id| id == field.id) + else { + continue; + }; + let coverage = overlay.coverage_for_field(field_pos)?; + let values = self + .read_overlay_value_column(overlay, field, coverage.len(), read_config) + .await?; + resolved.push(ResolvedFieldOverlay { coverage, values }); + } + if !resolved.is_empty() { + plans.push(FieldOverlayPlan { + field_name: field.name.clone(), + overlays_newest_first: resolved, + }); + } + } + Ok(plans) + } + + /// Read a single field's value column fully from an overlay data file. + /// + /// The value column has `num_values` rows (the popcount of the field's + /// coverage); position `r` holds the value for the offset whose rank is `r`. + async fn read_overlay_value_column( + &self, + overlay: &DataOverlayFile, + field: &lance_core::datatypes::Field, + num_values: u64, + read_config: &FragReadConfig, + ) -> Result { + if num_values == 0 { + return Ok(arrow_array::new_empty_array(&field.data_type())); + } + let single_field = Schema { + fields: vec![field.clone()], + metadata: Default::default(), + }; + let reader = self + .open_reader(&overlay.data_file, Some(&single_field), read_config) + .await? + .ok_or_else(|| { + Error::internal(format!( + "overlay data file {} does not contain field {} (id {})", + overlay.data_file.path, field.name, field.id + )) + })?; + let mut tasks = reader + .read_range_tasks( + 0..num_values, + num_values as u32, + reader.projection().clone(), + ) + .await?; + let mut chunks: Vec = Vec::new(); + while let Some(task) = tasks.next().await { + let batch = task.task.await?; + chunks.push(batch.column(0).clone()); + } + let chunk_refs: Vec<&dyn arrow_array::Array> = chunks.iter().map(|a| a.as_ref()).collect(); + Ok(arrow_select::concat::concat(&chunk_refs)?) + } + /// Count the rows in this fragment. pub async fn count_rows(&self, filter: Option) -> Result { match filter { @@ -2042,6 +2137,11 @@ pub struct FragmentReader { // total number of physical rows in the fragment (all rows, ignoring deletions) num_physical_rows: usize, + + /// Overlay value columns for the projected fields, loaded newest-first. + /// Empty when the fragment has no data overlay files. Merged into base + /// batches (by physical offset) before deletion filtering on every read. + overlay_plans: Arc>, } // Custom clone impl needed because it is not easy to clone Box @@ -2070,6 +2170,7 @@ impl Clone for FragmentReader { created_at_sequence: self.created_at_sequence.clone(), num_rows: self.num_rows, num_physical_rows: self.num_physical_rows, + overlay_plans: self.overlay_plans.clone(), } } } @@ -2137,6 +2238,7 @@ impl FragmentReader { created_at_sequence: None, num_rows, num_physical_rows, + overlay_plans: Arc::new(Vec::new()), }) } @@ -2394,6 +2496,46 @@ impl FragmentReader { Ok(result.project_by_schema(&output_schema)?) } + /// Merge data overlay values into a stream of base batches. + /// + /// Runs on physical rows in read order, *before* deletion filtering, so each + /// row can be addressed by its physical offset (from `params`) and deletions + /// take precedence naturally (an overlay value for a deleted row is dropped + /// with the row downstream). A no-op when the fragment has no overlays. + fn merge_overlays( + &self, + merged: ReadBatchTaskStream, + params: &ReadBatchParams, + total_num_rows: u32, + ) -> ReadBatchTaskStream { + if self.overlay_plans.is_empty() { + return merged; + } + let offsets: Arc> = + Arc::new(params.to_offsets_total(total_num_rows).values().to_vec()); + let plans = self.overlay_plans.clone(); + let mut row_start = 0usize; + merged + .map(move |task| { + let num_rows = task.num_rows; + let start = row_start; + row_start += num_rows as usize; + let offsets = offsets.clone(); + let plans = plans.clone(); + let inner = task.task; + ReadBatchTask { + num_rows, + task: async move { + let batch = inner.await?; + let slice = &offsets[start..start + batch.num_rows()]; + apply_overlays_to_batch(batch, slice, &plans) + } + .boxed(), + } + }) + .boxed() + } + async fn new_read_impl<'a, F>( &'a self, params: ReadBatchParams, @@ -2461,6 +2603,8 @@ impl FragmentReader { lance_table::utils::stream::merge_streams(read_streams) }; + let merged = self.merge_overlays(merged, ¶ms, total_num_rows); + // Add the row id column (if needed) and delete rows (if a deletion // vector is present). let config = RowIdAndDeletesConfig { @@ -2631,6 +2775,9 @@ impl FragmentReader { lance_table::utils::stream::merge_streams(read_streams) }; + let params = ReadBatchParams::Ranges(ranges); + let merged_stream = self.merge_overlays(merged_stream, ¶ms, total_num_rows); + // Add the row id column (if needed) and delete rows (if a deletion // vector is present). let config = RowIdAndDeletesConfig { @@ -2643,7 +2790,7 @@ impl FragmentReader { with_row_created_at_version: self.with_row_created_at_version, last_updated_at_sequence: self.last_updated_at_sequence.clone(), created_at_sequence: self.created_at_sequence.clone(), - params: ReadBatchParams::Ranges(ranges), + params, total_num_rows, }; let output_schema = Arc::new(self.output_schema.clone()); @@ -2839,6 +2986,350 @@ mod tests { Dataset::open(test_uri).await.unwrap() } + /// End-to-end tests for reading data overlay files (OSS-1324): overlays are + /// written, committed via the `DataOverlay` transaction, and then resolved on + /// the `take` and scan read paths. + mod overlay_read { + use std::sync::Arc; + + use arrow_array::{Array, ArrayRef, Int32Array, RecordBatch, RecordBatchIterator}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_core::datatypes::Schema; + use lance_file::version::LanceFileVersion; + use lance_file::writer::{FileWriter, FileWriterOptions}; + use lance_io::utils::CachedFileSize; + use lance_table::format::{DataFile, DataOverlayFile, OverlayCoverage}; + use object_store::path::Path; + use roaring::RoaringBitmap; + use rstest::rstest; + + use crate::dataset::transaction::{DataOverlayGroup, Operation}; + use crate::dataset::{Dataset, WriteDestination, WriteParams}; + + fn bitmap(offsets: impl IntoIterator) -> RoaringBitmap { + RoaringBitmap::from_iter(offsets) + } + + fn i32_array(values: impl IntoIterator>) -> ArrayRef { + Arc::new(Int32Array::from_iter(values)) + } + + /// Two-fragment Int32 dataset: `id` (field 0) = 0..12 and `val` (field 1) + /// = id * 10, written 6 rows per file (fragments 0 and 1). + /// + /// Uses an in-memory store so the test can write overlay files with a + /// store-relative `data/.lance` path and commit against the returned + /// dataset directly. + async fn create_base_dataset(version: LanceFileVersion) -> Dataset { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", DataType::Int32, true), + ArrowField::new("val", DataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..12)), + Arc::new(Int32Array::from_iter_values((0..12).map(|v| v * 10))), + ], + ) + .unwrap(); + let write_params = WriteParams { + max_rows_per_file: 6, + max_rows_per_group: 6, + data_storage_version: Some(version), + ..Default::default() + }; + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + Dataset::write(reader, "memory://", Some(write_params)) + .await + .unwrap() + } + + /// Write an overlay file covering `fields` (dataset field ids) of + /// `fragment_id` with the given coverage and per-field value columns, then + /// commit it as a `DataOverlay` transaction. `name` makes the file unique. + #[allow(clippy::too_many_arguments)] + async fn commit_overlay( + dataset: Dataset, + name: &str, + fragment_id: u64, + fields: &[i32], + coverage: OverlayCoverage, + columns: Vec, + version: LanceFileVersion, + ) -> Dataset { + let read_version = dataset.version().version; + let overlay_schema = dataset.schema().project_by_ids(fields, true); + + let filename = format!("{name}.lance"); + let path = Path::from(format!("data/{filename}")); + let obj_writer = dataset.object_store.create(&path).await.unwrap(); + let mut writer = FileWriter::try_new( + obj_writer, + overlay_schema, + FileWriterOptions { + format_version: Some(version), + ..Default::default() + }, + ) + .unwrap(); + let (major, minor) = writer.version().to_numbers(); + let columns: Vec<(usize, ArrayRef)> = columns.into_iter().enumerate().collect(); + writer.write_columns(columns).await.unwrap(); + let summary = writer.finish().await.unwrap(); + + let mut data_file = DataFile::new_unstarted(filename, major, minor); + data_file.fields = writer + .field_id_to_column_indices() + .iter() + .map(|(field_id, _)| *field_id as i32) + .collect::>() + .into(); + data_file.column_indices = writer + .field_id_to_column_indices() + .iter() + .map(|(_, column_index)| *column_index as i32) + .collect::>() + .into(); + data_file.file_size_bytes = CachedFileSize::new(summary.size_bytes); + + let overlay = DataOverlayFile { + data_file, + coverage, + committed_version: 0, + }; + Dataset::commit( + WriteDestination::Dataset(Arc::new(dataset)), + Operation::DataOverlay { + groups: vec![DataOverlayGroup { + fragment_id, + overlays: vec![overlay], + }], + }, + Some(read_version), + None, + None, + Arc::new(Default::default()), + false, + ) + .await + .unwrap() + } + + fn full_schema(dataset: &Dataset) -> Schema { + dataset.schema().clone() + } + + fn col(batch: &RecordBatch, name: &str) -> Int32Array { + let idx = batch.schema().index_of(name).unwrap(); + batch + .column(idx) + .as_any() + .downcast_ref::() + .unwrap() + .clone() + } + + #[rstest] + #[tokio::test] + async fn test_take_covered_and_uncovered( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let dataset = create_base_dataset(version).await; + // Overlay fragment 0's `val` at physical offsets {1, 4}. + let dataset = commit_overlay( + dataset, + "ov", + 0, + &[1], + OverlayCoverage::dense(&bitmap([1, 4])), + vec![i32_array([Some(111), Some(444)])], + version, + ) + .await; + + let frag = dataset.get_fragment(0).unwrap(); + let batch = frag + .take(&[0, 1, 2, 4], &full_schema(&dataset)) + .await + .unwrap(); + // Offsets 1 and 4 take overlay values; 0 and 2 fall through to base. + assert_eq!(col(&batch, "val").values(), &[0, 111, 20, 444]); + // The unrelated `id` column is untouched. + assert_eq!(col(&batch, "id").values(), &[0, 1, 2, 4]); + } + + #[rstest] + #[tokio::test] + async fn test_take_newest_overlay_wins( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let dataset = create_base_dataset(version).await; + let dataset = commit_overlay( + dataset, + "older", + 0, + &[1], + OverlayCoverage::dense(&bitmap([1, 4])), + vec![i32_array([Some(111), Some(444)])], + version, + ) + .await; + // A newer overlay (later commit -> higher committed_version) re-covers + // offset 1. + let dataset = commit_overlay( + dataset, + "newer", + 0, + &[1], + OverlayCoverage::dense(&bitmap([1])), + vec![i32_array([Some(999)])], + version, + ) + .await; + + let frag = dataset.get_fragment(0).unwrap(); + let batch = frag.take(&[1, 4], &full_schema(&dataset)).await.unwrap(); + // Offset 1 -> newest overlay (999); offset 4 -> only older covers it. + assert_eq!(col(&batch, "val").values(), &[999, 444]); + } + + #[rstest] + #[tokio::test] + async fn test_take_per_field_coverage( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let dataset = create_base_dataset(version).await; + // Sparse overlay: `id` covers {2}, `val` covers {2, 3} — different + // offset sets and therefore unequal-length value columns. + let dataset = commit_overlay( + dataset, + "sparse", + 0, + &[0, 1], + OverlayCoverage::sparse(&[bitmap([2]), bitmap([2, 3])]), + vec![i32_array([Some(777)]), i32_array([Some(220), Some(330)])], + version, + ) + .await; + + let frag = dataset.get_fragment(0).unwrap(); + let batch = frag.take(&[2, 3], &full_schema(&dataset)).await.unwrap(); + // id: offset 2 covered (777), offset 3 falls through (3). + assert_eq!(col(&batch, "id").values(), &[777, 3]); + // val: both offsets covered (220, 330). + assert_eq!(col(&batch, "val").values(), &[220, 330]); + } + + #[rstest] + #[tokio::test] + async fn test_take_null_override( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let dataset = create_base_dataset(version).await; + let dataset = commit_overlay( + dataset, + "nullov", + 0, + &[1], + OverlayCoverage::dense(&bitmap([0])), + vec![i32_array([None])], + version, + ) + .await; + + let frag = dataset.get_fragment(0).unwrap(); + let batch = frag.take(&[0, 1], &full_schema(&dataset)).await.unwrap(); + let val = col(&batch, "val"); + // Offset 0 is covered with a NULL value -> resolves to NULL; offset 1 + // falls through to the base value. + assert!(val.is_null(0)); + assert_eq!(val.value(1), 10); + } + + #[rstest] + #[tokio::test] + async fn test_overlay_on_deleted_row_is_inert( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let mut dataset = create_base_dataset(version).await; + // Delete global row 1 (fragment 0, physical offset 1). + dataset.delete("id = 1").await.unwrap(); + // Overlay covers the deleted offset 1 and the live offset 4. + let dataset = commit_overlay( + dataset, + "delov", + 0, + &[1], + OverlayCoverage::dense(&bitmap([1, 4])), + vec![i32_array([Some(111), Some(444)])], + version, + ) + .await; + + // Scan fragment 0: row 1 is gone, and offset 4's overlay value survives + // even though the deletion shifts logical positions — coverage is keyed + // by physical offset. + let frag = dataset.get_fragment(0).unwrap(); + let mut scanner = frag.scan(); + let batch = scanner + .project(&["id", "val"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(col(&batch, "id").values(), &[0, 2, 3, 4, 5]); + assert_eq!(col(&batch, "val").values(), &[0, 20, 30, 444, 50]); + } + + #[rstest] + #[tokio::test] + async fn test_scan_multi_fragment_overlays( + #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion, + ) { + let dataset = create_base_dataset(version).await; + // Overlay fragment 0 at offset 0 and fragment 1 at offset 0 (global + // row 6). Each fragment's coverage is independent. + let dataset = commit_overlay( + dataset, + "frag0", + 0, + &[1], + OverlayCoverage::dense(&bitmap([0])), + vec![i32_array([Some(1000)])], + version, + ) + .await; + let dataset = commit_overlay( + dataset, + "frag1", + 1, + &[1], + OverlayCoverage::dense(&bitmap([0])), + vec![i32_array([Some(6000)])], + version, + ) + .await; + + let batch = dataset + .scan() + .project(&["id", "val"]) + .unwrap() + .try_into_batch() + .await + .unwrap(); + assert_eq!(batch.num_rows(), 12); + let expected: Vec = (0..12) + .map(|i| match i { + 0 => 1000, + 6 => 6000, + other => other * 10, + }) + .collect(); + assert_eq!(col(&batch, "val").values(), &expected); + } + } + #[rstest] #[tokio::test] async fn test_fragment_scan( diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 87dda8e7e57..25168b54780 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -2125,6 +2125,7 @@ mod tests { let fragment = Fragment { id: 0, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(0), diff --git a/rust/lance/src/dataset/overlay.rs b/rust/lance/src/dataset/overlay.rs new file mode 100644 index 00000000000..907b41f2a14 --- /dev/null +++ b/rust/lance/src/dataset/overlay.rs @@ -0,0 +1,359 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Resolution of data overlay files on read. +//! +//! An overlay supplies new values for a subset of `(physical offset, field)` +//! cells. To resolve a field's values for a set of physical row offsets, the +//! overlays that cover that field are walked **newest to oldest**: the first +//! overlay that covers an offset wins, and its value is taken at the offset's +//! **rank** (the 0-based count of set bits below it) in the field's coverage +//! bitmap. An offset that no overlay covers falls through to the base value. +//! +//! The offsets are supplied explicitly (one per base row), so a single code path +//! serves both the scan (a contiguous physical range) and `take` (arbitrary +//! physical offsets) read paths. +//! +//! Deletions take precedence over overlays, but that is handled downstream: the +//! merge runs on physical rows *before* the deletion filter, so an overlay value +//! for a deleted offset is computed and then dropped with the row — making it +//! inert, exactly as the specification requires, with no special handling here. + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_select::interleave::interleave; +use lance_core::{Error, Result}; +use roaring::RoaringBitmap; + +use lance_table::format::DataOverlayFile; + +/// One field's contribution from a single overlay: which physical offsets it +/// covers, and the value column holding those offsets' values (indexed by rank). +#[derive(Debug, Clone)] +pub struct ResolvedFieldOverlay { + /// Physical offsets this overlay covers for the field. + pub coverage: RoaringBitmap, + /// The overlay's value column for the field. Its length must equal + /// `coverage.len()`; the value for a covered offset `o` is at `coverage`'s + /// rank of `o`. + pub values: ArrayRef, +} + +/// Order a fragment's overlays from newest to oldest for read resolution. +/// +/// Precedence is by `committed_version` (higher is newer); ties are broken by +/// position in the fragment's `overlays` list, where a later entry is newer. +/// Returns indices into `overlays`. +pub fn overlay_indices_newest_first(overlays: &[DataOverlayFile]) -> Vec { + let mut indices: Vec = (0..overlays.len()).collect(); + indices.sort_by(|&a, &b| { + overlays[b] + .committed_version + .cmp(&overlays[a].committed_version) + .then(b.cmp(&a)) + }); + indices +} + +/// Resolve a single field's values for the rows whose physical offsets are given +/// by `offsets` (one per base row, in the same order as `base`), merging the +/// overlays that cover the field (which must be supplied newest-first). +/// +/// The result has the same length and data type as `base`. A covered offset +/// whose overlay value is NULL resolves **to** NULL (distinct from an offset no +/// overlay covers, which keeps its base value). +pub fn resolve_overlay_column( + base: &ArrayRef, + offsets: &[u32], + overlays_newest_first: &[ResolvedFieldOverlay], +) -> Result { + if offsets.len() != base.len() { + return Err(Error::invalid_input(format!( + "overlay resolution got {} offsets for a base column of {} rows", + offsets.len(), + base.len() + ))); + } + if overlays_newest_first.is_empty() { + return Ok(base.clone()); + } + for (i, overlay) in overlays_newest_first.iter().enumerate() { + if overlay.values.len() as u64 != overlay.coverage.len() { + return Err(Error::invalid_input(format!( + "overlay value column {} has {} values but its coverage has {} offsets", + i, + overlay.values.len(), + overlay.coverage.len() + ))); + } + } + + // Source 0 is the base; source k+1 is overlays_newest_first[k].values. + let mut sources: Vec<&dyn Array> = Vec::with_capacity(overlays_newest_first.len() + 1); + sources.push(base.as_ref()); + for overlay in overlays_newest_first { + sources.push(overlay.values.as_ref()); + } + + let indices: Vec<(usize, usize)> = offsets + .iter() + .enumerate() + .map(|(i, &offset)| { + for (k, overlay) in overlays_newest_first.iter().enumerate() { + if overlay.coverage.contains(offset) { + // 0-based rank: number of set bits strictly below `offset`. + let rank = overlay.coverage.rank(offset) as usize - 1; + return (k + 1, rank); + } + } + (0, i) + }) + .collect(); + + interleave(&sources, &indices).map_err(Error::from) +} + +/// The overlays that apply to a single projected field, resolved (value columns +/// loaded) and ordered newest-first. `field_name` is the top-level read-batch +/// column name the plan applies to. +#[derive(Debug, Clone)] +pub struct FieldOverlayPlan { + pub field_name: String, + pub overlays_newest_first: Vec, +} + +/// Merge overlay values into a read batch of base values. +/// +/// `offsets[i]` is the physical row offset of `batch` row `i` (as produced by +/// [`lance_io::ReadBatchParams::to_offsets_total`]). Each plan replaces the +/// batch column whose name equals `plan.field_name`; columns with no plan, and +/// the row-id/row-address system columns, pass through unchanged. +/// +/// This runs on physical rows *before* deletion filtering, so an overlay value +/// computed for a deleted row is dropped with the row downstream — giving +/// deletions precedence with no special handling here. +pub fn apply_overlays_to_batch( + batch: RecordBatch, + offsets: &[u32], + plans: &[FieldOverlayPlan], +) -> Result { + if plans.is_empty() { + return Ok(batch); + } + let schema = batch.schema(); + let mut columns = batch.columns().to_vec(); + for plan in plans { + let Some(idx) = schema.index_of(&plan.field_name).ok() else { + // The plan's field is not in this batch's projection; skip it. + continue; + }; + columns[idx] = resolve_overlay_column(&columns[idx], offsets, &plan.overlays_newest_first)?; + } + Ok(RecordBatch::try_new(schema, columns)?) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use std::sync::Arc; + + fn i32_array(values: impl IntoIterator>) -> ArrayRef { + Arc::new(Int32Array::from_iter(values)) + } + + fn bitmap(offsets: impl IntoIterator) -> RoaringBitmap { + RoaringBitmap::from_iter(offsets) + } + + /// Physical offsets for a contiguous range `[start, start + len)`. + fn offsets(start: u32, len: usize) -> Vec { + (start..start + len as u32).collect() + } + + fn assert_i32_eq(actual: &ArrayRef, expected: impl IntoIterator>) { + let actual = actual.as_any().downcast_ref::().unwrap(); + assert_eq!(actual, &Int32Array::from_iter(expected)); + } + + #[test] + fn test_no_overlays_returns_base() { + let base = i32_array([Some(1), Some(2), Some(3)]); + let resolved = resolve_overlay_column(&base, &offsets(0, 3), &[]).unwrap(); + assert_i32_eq(&resolved, [Some(1), Some(2), Some(3)]); + } + + #[test] + fn test_single_overlay_rank_addressing() { + // Base ages [30, 25, 40, 22]; overlay sets offset 1 -> 26 (rank 0). + let base = i32_array([Some(30), Some(25), Some(40), Some(22)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([1]), + values: i32_array([Some(26)]), + }; + let resolved = resolve_overlay_column(&base, &offsets(0, 4), &[overlay]).unwrap(); + assert_i32_eq(&resolved, [Some(30), Some(26), Some(40), Some(22)]); + } + + #[test] + fn test_rank_addressing_multiple_offsets() { + // Coverage {0, 2, 3} -> values at ranks 0,1,2. + let base = i32_array([Some(10), Some(11), Some(12), Some(13)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([0, 2, 3]), + values: i32_array([Some(100), Some(120), Some(130)]), + }; + let resolved = resolve_overlay_column(&base, &offsets(0, 4), &[overlay]).unwrap(); + assert_i32_eq(&resolved, [Some(100), Some(11), Some(120), Some(130)]); + } + + #[test] + fn test_newest_overlay_wins() { + // Two overlays both cover offset 1; the newest (first in the slice) wins. + let base = i32_array([Some(0), Some(1), Some(2)]); + let newest = ResolvedFieldOverlay { + coverage: bitmap([1]), + values: i32_array([Some(999)]), + }; + let older = ResolvedFieldOverlay { + coverage: bitmap([1, 2]), + values: i32_array([Some(111), Some(222)]), + }; + let resolved = resolve_overlay_column(&base, &offsets(0, 3), &[newest, older]).unwrap(); + // offset 1 -> newest (999); offset 2 -> only older covers it (222). + assert_i32_eq(&resolved, [Some(0), Some(999), Some(222)]); + } + + #[test] + fn test_null_override_vs_fall_through() { + // A covered offset with a NULL value overrides the cell to NULL; an + // absent offset falls through to the base. + let base = i32_array([Some(1), Some(2), Some(3)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([0]), + values: i32_array([None]), + }; + let resolved = resolve_overlay_column(&base, &offsets(0, 3), &[overlay]).unwrap(); + assert_i32_eq(&resolved, [None, Some(2), Some(3)]); + } + + #[test] + fn test_physical_start_offset() { + // The batch covers physical rows [10, 13); the overlay covers offset 11. + let base = i32_array([Some(0), Some(0), Some(0)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([11]), + values: i32_array([Some(7)]), + }; + let resolved = resolve_overlay_column(&base, &offsets(10, 3), &[overlay]).unwrap(); + assert_i32_eq(&resolved, [Some(0), Some(7), Some(0)]); + } + + #[test] + fn test_string_column_merge() { + let base: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c"])); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([0, 2]), + values: Arc::new(StringArray::from(vec!["A", "C"])), + }; + let resolved = resolve_overlay_column(&base, &offsets(0, 3), &[overlay]).unwrap(); + let expected: ArrayRef = Arc::new(StringArray::from(vec!["A", "b", "C"])); + assert_eq!(&resolved, &expected); + } + + #[test] + fn test_non_contiguous_offsets() { + // `take` supplies arbitrary, non-contiguous physical offsets. The base + // rows correspond to offsets 5, 1, 8 (in that order); the overlay covers + // offsets {1, 8} with values at ranks 0, 1. + let base = i32_array([Some(50), Some(10), Some(80)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([1, 8]), + values: i32_array([Some(11), Some(88)]), + }; + let resolved = resolve_overlay_column(&base, &[5, 1, 8], &[overlay]).unwrap(); + // offset 5 uncovered -> base 50; offset 1 -> rank 0 (11); offset 8 -> rank 1 (88). + assert_i32_eq(&resolved, [Some(50), Some(11), Some(88)]); + } + + #[test] + fn test_offset_count_mismatch_errors() { + let base = i32_array([Some(1), Some(2), Some(3)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([0]), + values: i32_array([Some(9)]), + }; + // Two offsets for a three-row base column is a caller bug. + assert!(resolve_overlay_column(&base, &[0, 1], &[overlay]).is_err()); + } + + #[test] + fn test_value_count_mismatch_errors() { + let base = i32_array([Some(1), Some(2)]); + let overlay = ResolvedFieldOverlay { + coverage: bitmap([0, 1]), + values: i32_array([Some(9)]), // only one value for two covered offsets + }; + assert!(resolve_overlay_column(&base, &offsets(0, 2), &[overlay]).is_err()); + } + + #[test] + fn test_apply_overlays_to_batch_per_field() { + // Two columns; only "age" has an overlay. "name" passes through. + let schema = Arc::new(Schema::new(vec![ + Field::new("age", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + i32_array([Some(30), Some(25), Some(40)]), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + ) + .unwrap(); + let plans = vec![FieldOverlayPlan { + field_name: "age".to_string(), + overlays_newest_first: vec![ResolvedFieldOverlay { + coverage: bitmap([2, 5]), + values: i32_array([Some(26), Some(99)]), + }], + }]; + // Batch rows map to physical offsets 4, 5, 6; only offset 5 is covered, + // and offset 5 is at rank 1 in coverage {2, 5}, so its value is 99. + let merged = apply_overlays_to_batch(batch, &[4, 5, 6], &plans).unwrap(); + let ages = merged.column(0); + assert_i32_eq(ages, [Some(30), Some(99), Some(40)]); + let names = merged + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(names, &StringArray::from(vec!["a", "b", "c"])); + } + + #[test] + fn test_apply_overlays_to_batch_empty_plans_is_noop() { + let schema = Arc::new(Schema::new(vec![Field::new("age", DataType::Int32, true)])); + let batch = RecordBatch::try_new(schema, vec![i32_array([Some(1), Some(2)])]).unwrap(); + let merged = apply_overlays_to_batch(batch.clone(), &[0, 1], &[]).unwrap(); + assert_eq!(merged, batch); + } + + #[test] + fn test_overlay_ordering_newest_first() { + use lance_table::format::{DataFile, OverlayCoverage}; + let mk = |version: u64| DataOverlayFile { + data_file: DataFile::new_legacy_from_fields("o.lance", vec![1], None), + coverage: OverlayCoverage::Shared(vec![]), + committed_version: version, + }; + // List order [v2, v5, v3]; newest-first should be v5(idx1), v3(idx2), v2(idx0). + let overlays = vec![mk(2), mk(5), mk(3)]; + assert_eq!(overlay_indices_newest_first(&overlays), vec![1, 2, 0]); + + // Equal versions: later list position is newer. + let overlays = vec![mk(4), mk(4)]; + assert_eq!(overlay_indices_newest_first(&overlays), vec![1, 0]); + } +} diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 5ef35a33ab7..8c1595c4daa 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -1941,6 +1941,7 @@ mod test { Ok(Some(Fragment { files: vec![], id: 0, + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(50), diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4555cd7ee6c..32d1833f9bb 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -31,8 +31,9 @@ use lance_table::feature_flags::{FLAG_STABLE_ROW_IDS, apply_feature_flags}; use lance_table::rowids::read_row_ids; use lance_table::{ format::{ - BasePath, DataFile, DataStorageFormat, Fragment, IndexFile, IndexMetadata, Manifest, - RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, RowIdMeta, pb, + BasePath, DataFile, DataOverlayFile, DataStorageFormat, Fragment, IndexFile, IndexMetadata, + Manifest, RowDatasetVersionMeta, RowDatasetVersionRun, RowDatasetVersionSequence, + RowIdMeta, pb, }, io::{ commit::CommitHandler, @@ -258,6 +259,17 @@ pub struct Transaction { #[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub struct DataReplacementGroup(pub u64, pub DataFile); +/// Overlay files to append to a single fragment, in order (the last entry is +/// newest). The overlays are appended to the fragment's existing `overlays` +/// list rather than replacing it, so overlays written by concurrent commits are +/// preserved. Each overlay's `committed_version` is stamped to the new dataset +/// version at commit time (re-stamped on retry). +#[derive(Debug, Clone, DeepSizeOf, PartialEq)] +pub struct DataOverlayGroup { + pub fragment_id: u64, + pub overlays: Vec, +} + /// An entry for a map update. If value is None, the key will be removed from the map. #[derive(Debug, Clone, DeepSizeOf, PartialEq)] pub struct UpdateMapEntry { @@ -367,6 +379,11 @@ pub enum Operation { DataReplacement { replacements: Vec, }, + /// Attach overlay files to fragments, supplying new values for a subset of + /// `(row offset, field)` cells without rewriting the fragments' base data + /// files. See [`DataOverlayFile`] and the Data Overlay Files specification + /// for resolution, coverage, and versioning rules. + DataOverlay { groups: Vec }, /// Merge a new column in /// 'fragments' is the final fragments include all data files, the new fragments must align with old ones at rows. /// 'schema' is not forced to include existed columns, which means we could use Merge to drop column data @@ -499,6 +516,7 @@ impl std::fmt::Display for Operation { Self::Project { .. } => write!(f, "Project"), Self::UpdateConfig { .. } => write!(f, "UpdateConfig"), Self::DataReplacement { .. } => write!(f, "DataReplacement"), + Self::DataOverlay { .. } => write!(f, "DataOverlay"), Self::Clone { .. } => write!(f, "Clone"), Self::UpdateMemWalState { .. } => write!(f, "UpdateMemWalState"), Self::UpdateBases { .. } => write!(f, "UpdateBases"), @@ -1345,6 +1363,16 @@ impl PartialEq for Operation { (Self::Clone { .. }, Self::UpdateBases { .. }) => { std::mem::discriminant(self) == std::mem::discriminant(other) } + // Data overlays are intentionally permissive, like DataReplacement. + // Two overlays stack (the higher committed_version wins each covered + // cell), and overlays are compatible with appends, deletes, column + // rewrites, and concurrent overlays. Only an operation that rewrites + // rows or otherwise invalidates physical offsets (Rewrite, which + // covers compaction and overlay->base folds) conflicts, since the + // overlay's physical offsets would no longer be valid. + (Self::DataOverlay { .. }, Self::Rewrite { .. }) + | (Self::Rewrite { .. }, Self::DataOverlay { .. }) => true, + (Self::DataOverlay { .. }, _) | (_, Self::DataOverlay { .. }) => false, } } } @@ -1521,6 +1549,7 @@ impl Operation { Self::Project { .. } => "Project", Self::UpdateConfig { .. } => "UpdateConfig", Self::DataReplacement { .. } => "DataReplacement", + Self::DataOverlay { .. } => "DataOverlay", Self::UpdateMemWalState { .. } => "UpdateMemWalState", Self::Clone { .. } => "Clone", Self::UpdateBases { .. } => "UpdateBases", @@ -2300,6 +2329,42 @@ impl Transaction { &replaced_fields, ); } + Operation::DataOverlay { groups } => { + // Stamp each overlay with the version this commit is producing. + // build_manifest re-runs on every retry with an updated + // current_manifest, so this is naturally re-stamped on retry. + let new_version = current_manifest.map_or(1, |m| m.version + 1); + + let existing_fragments = maybe_existing_fragments?; + let overlays_by_fragment: HashMap> = groups + .iter() + .map(|g| (g.fragment_id, &g.overlays)) + .collect(); + + // Every group must target an existing fragment. + for fragment_id in overlays_by_fragment.keys() { + if !existing_fragments.iter().any(|f| f.id == *fragment_id) { + return Err(Error::invalid_input(format!( + "DataOverlay targets fragment {fragment_id}, which does not exist" + ))); + } + } + + for fragment in existing_fragments { + let mut fragment = fragment.clone(); + if let Some(new_overlays) = overlays_by_fragment.get(&fragment.id) { + // Appended (not replaced) so concurrently-written overlays + // survive; later entries are newer. + fragment.overlays.extend(new_overlays.iter().cloned().map( + |mut overlay| { + overlay.committed_version = new_version; + overlay + }, + )); + } + final_fragments.push(fragment); + } + } Operation::UpdateMemWalState { merged_generations } => { update_mem_wal_index_merged_generations( &mut final_indices, @@ -3007,6 +3072,34 @@ impl TryFrom for DataReplacementGroup { } } +impl From<&DataOverlayGroup> for pb::transaction::DataOverlayGroup { + fn from(group: &DataOverlayGroup) -> Self { + Self { + fragment_id: group.fragment_id, + overlays: group + .overlays + .iter() + .map(pb::DataOverlayFile::from) + .collect(), + } + } +} + +impl TryFrom for DataOverlayGroup { + type Error = Error; + + fn try_from(message: pb::transaction::DataOverlayGroup) -> Result { + Ok(Self { + fragment_id: message.fragment_id, + overlays: message + .overlays + .into_iter() + .map(DataOverlayFile::try_from) + .collect::>>()?, + }) + } +} + impl TryFrom for Transaction { type Error = Error; @@ -3289,6 +3382,14 @@ impl TryFrom for Transaction { })) => Operation::UpdateBases { new_bases: new_bases.into_iter().map(BasePath::from).collect(), }, + Some(pb::transaction::Operation::DataOverlay(pb::transaction::DataOverlay { + groups, + })) => Operation::DataOverlay { + groups: groups + .into_iter() + .map(DataOverlayGroup::try_from) + .collect::>>()?, + }, None => { return Err(Error::internal( "Transaction message did not contain an operation".to_string(), @@ -3559,6 +3660,14 @@ impl From<&Transaction> for pb::Transaction { .collect(), }) } + Operation::DataOverlay { groups } => { + pb::transaction::Operation::DataOverlay(pb::transaction::DataOverlay { + groups: groups + .iter() + .map(pb::transaction::DataOverlayGroup::from) + .collect(), + }) + } Operation::UpdateMemWalState { merged_generations } => { pb::transaction::Operation::UpdateMemWalState(pb::transaction::UpdateMemWalState { merged_generations: merged_generations @@ -4148,6 +4257,7 @@ mod tests { physical_rows: Some(100), row_id_meta: None, files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4180,6 +4290,7 @@ mod tests { physical_rows: Some(50), row_id_meta: Some(RowIdMeta::Inline(serialized)), files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4212,6 +4323,7 @@ mod tests { physical_rows: Some(50), // More physical rows than existing row IDs row_id_meta: Some(RowIdMeta::Inline(serialized)), files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4247,6 +4359,7 @@ mod tests { physical_rows: Some(50), // Less physical rows than existing row IDs row_id_meta: Some(RowIdMeta::Inline(serialized)), files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4275,6 +4388,7 @@ mod tests { physical_rows: Some(30), // No existing row IDs row_id_meta: None, files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4284,6 +4398,7 @@ mod tests { physical_rows: Some(25), // Partial existing row IDs row_id_meta: Some(RowIdMeta::Inline(serialized)), files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4328,6 +4443,7 @@ mod tests { physical_rows: None, row_id_meta: None, files: vec![], + overlays: vec![], deletion_file: None, last_updated_at_version_meta: None, created_at_version_meta: None, @@ -4835,6 +4951,7 @@ mod tests { let fragment = Fragment { id: 1, files: vec![data_file], + overlays: vec![], deletion_file: None, row_id_meta, physical_rows: Some(5), @@ -5104,6 +5221,7 @@ mod tests { None, )], physical_rows: Some(10), + overlays: vec![], deletion_file: None, row_id_meta: None, last_updated_at_version_meta: None, @@ -5195,6 +5313,7 @@ mod tests { let prev_fragment = Fragment { id: 0, files: vec![mk_file("before.lance")], + overlays: vec![], deletion_file: None, row_id_meta, physical_rows: Some(5), @@ -5267,6 +5386,7 @@ mod tests { let prev_fragment = Fragment { id: 0, files: vec![data_file.clone()], + overlays: vec![], deletion_file: None, row_id_meta: row_id_meta.clone(), physical_rows: Some(5), @@ -5286,6 +5406,7 @@ mod tests { let merged_fragment = Fragment { id: 0, files: vec![data_file], + overlays: vec![], deletion_file: None, row_id_meta, physical_rows: Some(5), @@ -5335,6 +5456,7 @@ mod tests { let prev_fragment = Fragment { id: 0, files: vec![mk_file("before.lance")], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(5), @@ -5399,6 +5521,7 @@ mod tests { let existing_fragment = Fragment { id: 0, files: vec![mk_file("existing.lance")], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_0))), physical_rows: Some(3), @@ -5421,6 +5544,7 @@ mod tests { let new_fragment = Fragment { id: 1, files: vec![mk_file("new.lance")], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&row_ids_1))), physical_rows: Some(4), @@ -5483,6 +5607,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(3), @@ -5496,6 +5621,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -5538,6 +5664,7 @@ mod tests { Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_a_seq))), physical_rows: Some(2), @@ -5549,6 +5676,7 @@ mod tests { Fragment { id: 2, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&frag_b_seq))), physical_rows: Some(3), @@ -5564,6 +5692,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -5605,6 +5734,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(2), @@ -5619,6 +5749,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -5663,6 +5794,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(2), @@ -5676,6 +5808,7 @@ mod tests { let new_fragment = Fragment { id: 20, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(4), @@ -5709,6 +5842,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(2), @@ -5720,6 +5854,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(1), @@ -5748,6 +5883,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(2), @@ -5758,6 +5894,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(3), @@ -5788,6 +5925,7 @@ mod tests { let existing_fragment = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&existing_seq))), physical_rows: Some(2), @@ -5801,6 +5939,7 @@ mod tests { let new_fragment = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(1), @@ -5842,6 +5981,7 @@ mod tests { let in_range_frag = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&in_range_seq))), physical_rows: Some(2), @@ -5862,6 +6002,7 @@ mod tests { let out_of_range_frag = Fragment { id: 2, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&out_of_range_seq))), physical_rows: Some(2), @@ -5876,6 +6017,7 @@ mod tests { let new_frag = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -5914,6 +6056,7 @@ mod tests { let existing = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&seq))), physical_rows: Some(3), @@ -5926,6 +6069,7 @@ mod tests { let new_frag = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -5974,6 +6118,7 @@ mod tests { let src_frag = Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&src_seq))), physical_rows: Some(100), @@ -5988,6 +6133,7 @@ mod tests { let new_frag = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(100), @@ -6035,6 +6181,7 @@ mod tests { Fragment { id: 1, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&seq_a))), physical_rows: Some(3), @@ -6046,6 +6193,7 @@ mod tests { Fragment { id: 2, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&seq_b))), physical_rows: Some(3), @@ -6061,6 +6209,7 @@ mod tests { let new_frag = Fragment { id: 10, files: vec![], + overlays: vec![], deletion_file: None, row_id_meta: Some(RowIdMeta::Inline(write_row_ids(&new_seq))), physical_rows: Some(2), @@ -6127,4 +6276,47 @@ mod tests { assert!(!left.modifies_same_metadata(&different_key)); assert!(left.modifies_same_metadata(&replace)); } + + #[test] + fn test_data_overlay_operation_roundtrips() { + // A DataOverlay operation survives the protobuf round-trip, preserving + // the target fragment, the overlay's coverage, and its committed_version. + use lance_table::format::{DataOverlayFile, OverlayCoverage}; + + let mut bitmap = roaring::RoaringBitmap::new(); + bitmap.insert(1); + bitmap.insert(4); + let overlay = DataOverlayFile { + data_file: DataFile::new_legacy_from_fields("overlay-0.lance", vec![3], None), + coverage: OverlayCoverage::dense(&bitmap), + committed_version: 6, + }; + let pb_overlay = pb::DataOverlayFile::from(&overlay); + + let message = pb::Transaction { + read_version: 1, + uuid: Uuid::new_v4().to_string(), + operation: Some(pb::transaction::Operation::DataOverlay( + pb::transaction::DataOverlay { + groups: vec![pb::transaction::DataOverlayGroup { + fragment_id: 7, + overlays: vec![pb_overlay], + }], + }, + )), + ..Default::default() + }; + + let txn = Transaction::try_from(message).unwrap(); + match txn.operation { + Operation::DataOverlay { groups } => { + assert_eq!(groups.len(), 1); + assert_eq!(groups[0].fragment_id, 7); + assert_eq!(groups[0].overlays.len(), 1); + assert_eq!(groups[0].overlays[0].committed_version, 6); + assert_eq!(groups[0].overlays[0].coverage_for_field(0).unwrap(), bitmap); + } + other => panic!("expected DataOverlay, got {other:?}"), + } + } } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ff0a119158c..47c4add18bd 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -3562,6 +3562,7 @@ mod tests { let fragments = vec![Fragment { id: 0, files: vec![external_file, local_file], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(0), diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index baad71b3e39..7aed72a2d71 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -551,6 +551,7 @@ mod tests { file_size_bytes: CachedFileSize::new(100), base_id: None, }], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(10), diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index ce0d29d550b..8980dd4f1ed 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -1687,6 +1687,7 @@ mod tests { DataFile::new_legacy_from_fields("path1", vec![0, 1, 2], None), DataFile::new_legacy_from_fields("unused", vec![9], None), ], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1699,6 +1700,7 @@ mod tests { DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None), DataFile::new_legacy_from_fields("path3", vec![2], None), ], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1736,6 +1738,7 @@ mod tests { vec![0, 1, 10], None, )], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1748,6 +1751,7 @@ mod tests { DataFile::new_legacy_from_fields("path2", vec![0, 1, 2], None), DataFile::new_legacy_from_fields("path3", vec![10], None), ], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: None, @@ -1838,6 +1842,7 @@ mod tests { let fragment = Fragment { id: 0, files: vec![data_file], + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(100), diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index dc898534c89..1af51506f8e 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -137,6 +137,21 @@ impl<'a> TransactionRebase<'a> { conflicting_mem_wal_merged_gens: Vec::new(), }) } + Operation::DataOverlay { groups } => { + let modified_fragment_ids = + groups.iter().map(|g| g.fragment_id).collect::>(); + let initial_fragments = + initial_fragments_for_rebase(dataset, &transaction, &modified_fragment_ids) + .await; + Ok(Self { + transaction, + affected_rows, + initial_fragments, + modified_fragment_ids, + conflicting_frag_reuse_indices: Vec::new(), + conflicting_mem_wal_merged_gens: Vec::new(), + }) + } Operation::Merge { fragments, .. } => { let modified_fragment_ids = fragments.iter().map(|f| f.id).collect::>(); let initial_fragments = @@ -203,6 +218,9 @@ impl<'a> TransactionRebase<'a> { Operation::DataReplacement { .. } => { self.check_data_replacement_txn(other_transaction, other_version) } + Operation::DataOverlay { .. } => { + self.check_data_overlay_txn(other_transaction, other_version) + } Operation::Merge { .. } => self.check_merge_txn(other_transaction, other_version), Operation::Restore { .. } => self.check_restore_txn(other_transaction, other_version), Operation::ReserveFragments { .. } => { @@ -235,6 +253,10 @@ impl<'a> TransactionRebase<'a> { | Operation::Project { .. } | Operation::Append { .. } | Operation::UpdateConfig { .. } + // A concurrent overlay is inert against the rows we delete + // (deletions take precedence over overlays) and otherwise + // preserves physical offsets, so it never conflicts. + | Operation::DataOverlay { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::Rewrite { groups, .. } => { if groups @@ -382,6 +404,9 @@ impl<'a> TransactionRebase<'a> { | Operation::Project { .. } | Operation::Clone { .. } | Operation::UpdateConfig { .. } + // A concurrent overlay preserves physical offsets and is newer + // than this update, so it wins its covered cells without conflict. + | Operation::DataOverlay { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::Append { .. } => { // If current transaction has primary key conflict detection, @@ -498,6 +523,10 @@ impl<'a> TransactionRebase<'a> { match &other_transaction.operation { Operation::Append { .. } | Operation::Clone { .. } + // An overlay committed after this index's version is newer than + // the index; the query path excludes its covered cells via the + // version gate, so the build does not conflict. + | Operation::DataOverlay { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::CreateIndex { new_indices: created_indices, @@ -679,6 +708,20 @@ impl<'a> TransactionRebase<'a> { Ok(()) } } + Operation::DataOverlay { groups } => { + // Rewriting a fragment changes its physical row addresses, so + // an overlay addressed by physical offset on that fragment is + // invalidated and must be re-applied against the new base. + if groups + .iter() + .map(|g| g.fragment_id) + .any(|id| self.modified_fragment_ids.contains(&id)) + { + Err(self.retryable_conflict_err(other_transaction, other_version)) + } else { + Ok(()) + } + } Operation::Rewrite { groups, frag_reuse_index: committed_fri, @@ -858,6 +901,7 @@ impl<'a> TransactionRebase<'a> { | Operation::CreateIndex { .. } | Operation::Rewrite { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } @@ -891,7 +935,8 @@ impl<'a> TransactionRebase<'a> { | Operation::Merge { .. } | Operation::UpdateConfig { .. } | Operation::Clone { .. } - | Operation::DataReplacement { .. } => Ok(()), + | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } => Ok(()), } } @@ -907,6 +952,9 @@ impl<'a> TransactionRebase<'a> { | Operation::UpdateConfig { .. } | Operation::ReserveFragments { .. } | Operation::Project { .. } + // Both a column replacement and an overlay preserve physical row + // addresses; the overlay is newer and wins its covered cells. + | Operation::DataOverlay { .. } | Operation::UpdateBases { .. } => Ok(()), Operation::Merge { .. } => { // Merge rewrites the whole fragment list; always conflict @@ -1009,6 +1057,57 @@ impl<'a> TransactionRebase<'a> { } } + /// Conflict checks for our DataOverlay transaction against a concurrent one. + /// + /// Overlays are intentionally permissive (see the Data Overlay Files spec): + /// they stack with other overlays and tolerate appends, deletes, column + /// rewrites, and index builds, because overlay coverage is addressed by + /// physical offset and the version gate keeps indexes correct. The only + /// concurrent operations that invalidate an overlay are those that rewrite + /// rows or consume the overlays on one of our fragments (Rewrite / Merge), + /// and the whole-dataset replacements (Overwrite / Restore). + fn check_data_overlay_txn( + &mut self, + other_transaction: &Transaction, + other_version: u64, + ) -> Result<()> { + match &other_transaction.operation { + Operation::Append { .. } + | Operation::Delete { .. } + | Operation::Update { .. } + | Operation::CreateIndex { .. } + | Operation::ReserveFragments { .. } + | Operation::Project { .. } + | Operation::UpdateConfig { .. } + | Operation::UpdateBases { .. } + | Operation::Clone { .. } + | Operation::UpdateMemWalState { .. } + | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } => Ok(()), + Operation::Rewrite { groups, .. } => { + // A rewrite (compaction / fold) of a fragment we are overlaying + // changes its physical row addresses, so our offsets would be + // invalid. Conflict only if it touches one of our fragments. + let touches_our_fragment = groups + .iter() + .flat_map(|g| g.old_fragments.iter()) + .any(|f| self.modified_fragment_ids.contains(&f.id)); + if touches_our_fragment { + Err(self.retryable_conflict_err(other_transaction, other_version)) + } else { + Ok(()) + } + } + Operation::Merge { .. } => { + // Merge rewrites the whole fragment list; always conflict. + Err(self.retryable_conflict_err(other_transaction, other_version)) + } + Operation::Overwrite { .. } | Operation::Restore { .. } => { + Err(self.incompatible_conflict_err(other_transaction, other_version)) + } + } + } + fn check_merge_txn( &mut self, other_transaction: &Transaction, @@ -1026,7 +1125,8 @@ impl<'a> TransactionRebase<'a> { | Operation::Delete { .. } | Operation::Rewrite { .. } | Operation::Merge { .. } - | Operation::DataReplacement { .. } => { + | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } => { Err(self.retryable_conflict_err(other_transaction, other_version)) } Operation::Overwrite { .. } @@ -1050,6 +1150,7 @@ impl<'a> TransactionRebase<'a> { | Operation::CreateIndex { .. } | Operation::Rewrite { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } @@ -1078,6 +1179,7 @@ impl<'a> TransactionRebase<'a> { | Operation::CreateIndex { .. } | Operation::Rewrite { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::ReserveFragments { .. } | Operation::Update { .. } @@ -1102,6 +1204,7 @@ impl<'a> TransactionRebase<'a> { | Operation::UpdateConfig { .. } | Operation::CreateIndex { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Rewrite { .. } | Operation::Clone { .. } | Operation::ReserveFragments { .. } @@ -1166,6 +1269,7 @@ impl<'a> TransactionRebase<'a> { | Operation::CreateIndex { .. } | Operation::Rewrite { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } @@ -1238,6 +1342,7 @@ impl<'a> TransactionRebase<'a> { | Operation::Overwrite { .. } | Operation::Delete { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::Restore { .. } | Operation::Clone { .. } @@ -1331,6 +1436,7 @@ impl<'a> TransactionRebase<'a> { Operation::Append { .. } | Operation::Overwrite { .. } | Operation::DataReplacement { .. } + | Operation::DataOverlay { .. } | Operation::Merge { .. } | Operation::Restore { .. } | Operation::ReserveFragments { .. } @@ -3208,6 +3314,7 @@ mod tests { Operation::DataReplacement { replacements } => { Box::new(replacements.iter().map(|r| r.0)) } + Operation::DataOverlay { groups } => Box::new(groups.iter().map(|g| g.fragment_id)), } } diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 3338eee07a8..f804a7cc38a 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -243,6 +243,7 @@ impl TestDatasetGenerator { Fragment { id: 0, files, + overlays: vec![], deletion_file: None, row_id_meta: None, physical_rows: Some(batch.num_rows()),