Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ message Transaction {
// Only tracks keys from INSERT operations during merge insert, not updates.
optional KeyExistenceFilter inserted_rows = 8;
// Per-fragment physical row offsets that matched an update_columns hash join (RewriteColumns).
// Deprecated: use updated_fragment_offset_bitmaps (field 10) instead.
map<uint64, UInt32List> updated_fragment_offsets = 9;
// Per-fragment matched offsets as portable RoaringBitmap bytes (replaces field 9).
// Writers emit field 10 only. Readers prefer field 10; fall back to field 9 for
// manifests written before this change.
map<uint64, bytes> updated_fragment_offset_bitmaps = 10;
}

// The mode of update operation
Expand Down
183 changes: 175 additions & 8 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3146,6 +3146,7 @@ impl TryFrom<pb::Transaction> for Transaction {
update_mode,
inserted_rows,
updated_fragment_offsets,
updated_fragment_offset_bitmaps,
})) => Operation::Update {
removed_fragment_ids,
updated_fragments: updated_fragments
Expand All @@ -3171,11 +3172,31 @@ impl TryFrom<pb::Transaction> for Transaction {
.map(|ik| KeyExistenceFilter::try_from(&ik))
.transpose()?,
updated_fragment_offsets: {
let m: HashMap<u64, RoaringBitmap> = updated_fragment_offsets
.into_iter()
.filter(|(_, list)| !list.values.is_empty())
.map(|(id, list)| (id, RoaringBitmap::from_iter(list.values)))
.collect();
// Prefer field 10 (RoaringBitmap bytes); fall back to field 9 (UInt32List)
// for manifests written before this change.
let m: HashMap<u64, RoaringBitmap> =
if !updated_fragment_offset_bitmaps.is_empty() {
updated_fragment_offset_bitmaps
.into_iter()
.filter(|(_, bytes)| !bytes.is_empty())
.map(|(id, bytes)| {
let bitmap = RoaringBitmap::deserialize_from(bytes.as_slice())
.map_err(|e| {
Error::invalid_input(format!(
"invalid updated_fragment_offset_bitmaps \
for fragment {id}: {e}"
))
})?;
Ok((id, bitmap))
})
.collect::<Result<HashMap<_, _>>>()?
} else {
updated_fragment_offsets
.into_iter()
.filter(|(_, list)| !list.values.is_empty())
.map(|(id, list)| (id, RoaringBitmap::from_iter(list.values)))
.collect()
};
if m.is_empty() {
None
} else {
Expand Down Expand Up @@ -3506,14 +3527,19 @@ impl From<&Transaction> for pb::Transaction {
})
.unwrap_or(0),
inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()),
updated_fragment_offsets: updated_fragment_offsets
// Field 9: no longer written; kept empty for forward compat.
updated_fragment_offsets: HashMap::new(),
// Field 10: RoaringBitmap bytes.
updated_fragment_offset_bitmaps: updated_fragment_offsets
.as_ref()
.map(|UpdatedFragmentOffsets(m)| {
m.iter()
.filter(|(_, b)| !b.is_empty())
.map(|(frag_id, b)| {
let values: Vec<u32> = b.iter().collect();
(*frag_id, pb::transaction::UInt32List { values })
let mut buf = Vec::new();
b.serialize_into(&mut buf)
.expect("RoaringBitmap serialization cannot fail");
(*frag_id, buf)
})
.collect::<HashMap<_, _>>()
})
Expand Down Expand Up @@ -4877,6 +4903,147 @@ mod tests {
);
}

#[test]
fn test_proto_round_trip_field_10() {
let off_map = HashMap::from([
(1u64, RoaringBitmap::from_iter([1u32, 3, 5])),
(2u64, RoaringBitmap::from_iter([0u32, 2, 4, 6])),
]);
let tx = Transaction::new(
1,
Operation::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![],
new_fragments: vec![],
fields_modified: vec![],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: Some(UpdateMode::RewriteColumns),
inserted_rows_filter: None,
updated_fragment_offsets: Some(UpdatedFragmentOffsets(off_map.clone())),
},
None,
);

let pb_tx: pb::Transaction = pb::Transaction::from(&tx);

// Field 9 must be empty; field 10 must be populated.
if let Some(pb::transaction::Operation::Update(ref update)) = pb_tx.operation {
assert!(
update.updated_fragment_offsets.is_empty(),
"field 9 should be empty"
);
assert_eq!(update.updated_fragment_offset_bitmaps.len(), 2);
} else {
panic!("expected Update operation");
}

let tx2 = Transaction::try_from(pb_tx).unwrap();
if let Operation::Update {
updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)),
..
} = &tx2.operation
{
assert_eq!(m.len(), 2);
assert_eq!(*m.get(&1).unwrap(), off_map[&1]);
assert_eq!(*m.get(&2).unwrap(), off_map[&2]);
} else {
panic!("expected Update with offsets");
}
}

#[test]
fn test_proto_legacy_field_9_read() {
// Simulate a manifest written by old Lance: only field 9, no field 10.
let pb_tx = pb::Transaction {
read_version: 1,
uuid: "test".to_string(),
tag: String::new(),
transaction_properties: HashMap::new(),
operation: Some(pb::transaction::Operation::Update(
pb::transaction::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![],
new_fragments: vec![],
fields_modified: vec![],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: 1,
inserted_rows: None,
updated_fragment_offsets: HashMap::from([(
1u64,
pb::transaction::UInt32List {
values: vec![1, 3, 5],
},
)]),
updated_fragment_offset_bitmaps: HashMap::new(),
},
)),
};

let tx = Transaction::try_from(pb_tx).unwrap();
if let Operation::Update {
updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)),
..
} = &tx.operation
{
assert_eq!(m.len(), 1);
let bitmap = m.get(&1).unwrap();
let offsets: Vec<u32> = bitmap.iter().collect();
assert_eq!(offsets, vec![1, 3, 5]);
} else {
panic!("expected Update with offsets from legacy field 9");
}
}

#[test]
fn test_proto_field_10_takes_precedence_over_field_9() {
// When both fields present, field 10 wins.
let mut bitmap_bytes = Vec::new();
RoaringBitmap::from_iter([10u32, 20, 30])
.serialize_into(&mut bitmap_bytes)
.unwrap();

let pb_tx = pb::Transaction {
read_version: 1,
uuid: "test".to_string(),
tag: String::new(),
transaction_properties: HashMap::new(),
operation: Some(pb::transaction::Operation::Update(
pb::transaction::Update {
removed_fragment_ids: vec![],
updated_fragments: vec![],
new_fragments: vec![],
fields_modified: vec![],
merged_generations: vec![],
fields_for_preserving_frag_bitmap: vec![],
update_mode: 1,
inserted_rows: None,
// Field 9 has different values than field 10.
updated_fragment_offsets: HashMap::from([(
1u64,
pb::transaction::UInt32List {
values: vec![99, 100],
},
)]),
updated_fragment_offset_bitmaps: HashMap::from([(1u64, bitmap_bytes)]),
},
)),
};

let tx = Transaction::try_from(pb_tx).unwrap();
if let Operation::Update {
updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)),
..
} = &tx.operation
{
let offsets: Vec<u32> = m.get(&1).unwrap().iter().collect();
assert_eq!(offsets, vec![10, 20, 30], "field 10 should take precedence");
} else {
panic!("expected Update with offsets from field 10");
}
}

/// Partial RewriteColumns refresh in `build_manifest`: only matched physical
/// rows get `last_updated_at_version` bumped; same-fragment unmatched rows and
/// untouched fragments keep both version sequences.
Expand Down
Loading