diff --git a/protos/transaction.proto b/protos/transaction.proto index e72e95025a4..f86354980b7 100644 --- a/protos/transaction.proto +++ b/protos/transaction.proto @@ -254,7 +254,12 @@ message Transaction { // Only tracks keys from INSERT operations during merge insert, not updates. optional KeyExistenceFilter inserted_rows = 8; // Per-fragment physical row offsets that matched an update_columns hash join (RewriteColumns). + // Deprecated: use updated_fragment_offset_bitmaps (field 10) instead. map updated_fragment_offsets = 9; + // Per-fragment matched offsets as portable RoaringBitmap bytes (replaces field 9). + // Writers emit field 10 only. Readers prefer field 10; fall back to field 9 for + // manifests written before this change. + map updated_fragment_offset_bitmaps = 10; } // The mode of update operation diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4555cd7ee6c..ad12d1f5c2b 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -3146,6 +3146,7 @@ impl TryFrom for Transaction { update_mode, inserted_rows, updated_fragment_offsets, + updated_fragment_offset_bitmaps, })) => Operation::Update { removed_fragment_ids, updated_fragments: updated_fragments @@ -3171,11 +3172,31 @@ impl TryFrom for Transaction { .map(|ik| KeyExistenceFilter::try_from(&ik)) .transpose()?, updated_fragment_offsets: { - let m: HashMap = updated_fragment_offsets - .into_iter() - .filter(|(_, list)| !list.values.is_empty()) - .map(|(id, list)| (id, RoaringBitmap::from_iter(list.values))) - .collect(); + // Prefer field 10 (RoaringBitmap bytes); fall back to field 9 (UInt32List) + // for manifests written before this change. + let m: HashMap = + if !updated_fragment_offset_bitmaps.is_empty() { + updated_fragment_offset_bitmaps + .into_iter() + .filter(|(_, bytes)| !bytes.is_empty()) + .map(|(id, bytes)| { + let bitmap = RoaringBitmap::deserialize_from(bytes.as_slice()) + .map_err(|e| { + Error::invalid_input(format!( + "invalid updated_fragment_offset_bitmaps \ + for fragment {id}: {e}" + )) + })?; + Ok((id, bitmap)) + }) + .collect::>>()? + } else { + updated_fragment_offsets + .into_iter() + .filter(|(_, list)| !list.values.is_empty()) + .map(|(id, list)| (id, RoaringBitmap::from_iter(list.values))) + .collect() + }; if m.is_empty() { None } else { @@ -3506,14 +3527,19 @@ impl From<&Transaction> for pb::Transaction { }) .unwrap_or(0), inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()), - updated_fragment_offsets: updated_fragment_offsets + // Field 9: no longer written; kept empty for forward compat. + updated_fragment_offsets: HashMap::new(), + // Field 10: RoaringBitmap bytes. + updated_fragment_offset_bitmaps: updated_fragment_offsets .as_ref() .map(|UpdatedFragmentOffsets(m)| { m.iter() .filter(|(_, b)| !b.is_empty()) .map(|(frag_id, b)| { - let values: Vec = b.iter().collect(); - (*frag_id, pb::transaction::UInt32List { values }) + let mut buf = Vec::new(); + b.serialize_into(&mut buf) + .expect("RoaringBitmap serialization cannot fail"); + (*frag_id, buf) }) .collect::>() }) @@ -4877,6 +4903,147 @@ mod tests { ); } + #[test] + fn test_proto_round_trip_field_10() { + let off_map = HashMap::from([ + (1u64, RoaringBitmap::from_iter([1u32, 3, 5])), + (2u64, RoaringBitmap::from_iter([0u32, 2, 4, 6])), + ]); + let tx = Transaction::new( + 1, + Operation::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: Some(UpdateMode::RewriteColumns), + inserted_rows_filter: None, + updated_fragment_offsets: Some(UpdatedFragmentOffsets(off_map.clone())), + }, + None, + ); + + let pb_tx: pb::Transaction = pb::Transaction::from(&tx); + + // Field 9 must be empty; field 10 must be populated. + if let Some(pb::transaction::Operation::Update(ref update)) = pb_tx.operation { + assert!( + update.updated_fragment_offsets.is_empty(), + "field 9 should be empty" + ); + assert_eq!(update.updated_fragment_offset_bitmaps.len(), 2); + } else { + panic!("expected Update operation"); + } + + let tx2 = Transaction::try_from(pb_tx).unwrap(); + if let Operation::Update { + updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)), + .. + } = &tx2.operation + { + assert_eq!(m.len(), 2); + assert_eq!(*m.get(&1).unwrap(), off_map[&1]); + assert_eq!(*m.get(&2).unwrap(), off_map[&2]); + } else { + panic!("expected Update with offsets"); + } + } + + #[test] + fn test_proto_legacy_field_9_read() { + // Simulate a manifest written by old Lance: only field 9, no field 10. + let pb_tx = pb::Transaction { + read_version: 1, + uuid: "test".to_string(), + tag: String::new(), + transaction_properties: HashMap::new(), + operation: Some(pb::transaction::Operation::Update( + pb::transaction::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: 1, + inserted_rows: None, + updated_fragment_offsets: HashMap::from([( + 1u64, + pb::transaction::UInt32List { + values: vec![1, 3, 5], + }, + )]), + updated_fragment_offset_bitmaps: HashMap::new(), + }, + )), + }; + + let tx = Transaction::try_from(pb_tx).unwrap(); + if let Operation::Update { + updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)), + .. + } = &tx.operation + { + assert_eq!(m.len(), 1); + let bitmap = m.get(&1).unwrap(); + let offsets: Vec = bitmap.iter().collect(); + assert_eq!(offsets, vec![1, 3, 5]); + } else { + panic!("expected Update with offsets from legacy field 9"); + } + } + + #[test] + fn test_proto_field_10_takes_precedence_over_field_9() { + // When both fields present, field 10 wins. + let mut bitmap_bytes = Vec::new(); + RoaringBitmap::from_iter([10u32, 20, 30]) + .serialize_into(&mut bitmap_bytes) + .unwrap(); + + let pb_tx = pb::Transaction { + read_version: 1, + uuid: "test".to_string(), + tag: String::new(), + transaction_properties: HashMap::new(), + operation: Some(pb::transaction::Operation::Update( + pb::transaction::Update { + removed_fragment_ids: vec![], + updated_fragments: vec![], + new_fragments: vec![], + fields_modified: vec![], + merged_generations: vec![], + fields_for_preserving_frag_bitmap: vec![], + update_mode: 1, + inserted_rows: None, + // Field 9 has different values than field 10. + updated_fragment_offsets: HashMap::from([( + 1u64, + pb::transaction::UInt32List { + values: vec![99, 100], + }, + )]), + updated_fragment_offset_bitmaps: HashMap::from([(1u64, bitmap_bytes)]), + }, + )), + }; + + let tx = Transaction::try_from(pb_tx).unwrap(); + if let Operation::Update { + updated_fragment_offsets: Some(UpdatedFragmentOffsets(m)), + .. + } = &tx.operation + { + let offsets: Vec = m.get(&1).unwrap().iter().collect(); + assert_eq!(offsets, vec![10, 20, 30], "field 10 should take precedence"); + } else { + panic!("expected Update with offsets from field 10"); + } + } + /// Partial RewriteColumns refresh in `build_manifest`: only matched physical /// rows get `last_updated_at_version` bumped; same-fragment unmatched rows and /// untouched fragments keep both version sequences.