diff --git a/docs/content/pypaimon/ray-data.md b/docs/content/pypaimon/ray-data.md index 4e049248dee1..b0ea958849be 100644 --- a/docs/content/pypaimon/ray-data.md +++ b/docs/content/pypaimon/ray-data.md @@ -90,6 +90,26 @@ ray_dataset = read_paimon( ) ``` +**Time travel:** + +```python +# Read a specific snapshot. +ray_dataset = read_paimon( + "database_name.table_name", + catalog_options={"warehouse": "/path/to/warehouse"}, + snapshot_id=42, +) + +# Read a tagged snapshot. +ray_dataset = read_paimon( + "database_name.table_name", + catalog_options={"warehouse": "/path/to/warehouse"}, + tag_name="release-2026-04", +) +``` + +`snapshot_id` and `tag_name` are mutually exclusive. + **Parameters:** - `table_identifier`: full table name, e.g. `"db_name.table_name"`. - `catalog_options`: kwargs forwarded to `CatalogFactory.create()`, @@ -97,6 +117,10 @@ ray_dataset = read_paimon( - `filter`: optional `Predicate` to push down into the scan. - `projection`: optional list of column names to read. - `limit`: optional row limit applied at scan planning time. +- `snapshot_id`: optional snapshot id to time-travel to. Mutually + exclusive with `tag_name`. +- `tag_name`: optional tag name to time-travel to. Mutually + exclusive with `snapshot_id`. - `override_num_blocks`: optional override for the number of output blocks. Must be `>= 1`. - `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index 3fe2e7945578..1e3f789376a5 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -261,6 +261,16 @@ class CoreOptions: .with_description("Optional tag name used in case of 'from-snapshot' scan mode.") ) + SCAN_SNAPSHOT_ID: ConfigOption[int] = ( + ConfigOptions.key("scan.snapshot-id") + .long_type() + .no_default_value() + .with_description( + "Optional snapshot id used in case of 'from-snapshot' or " + "'from-snapshot-full' scan mode." + ) + ) + SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("source.split.target-size") .memory_type() @@ -568,6 +578,9 @@ def incremental_between_timestamp(self, default=None): def scan_tag_name(self, default=None): return self.options.get(CoreOptions.SCAN_TAG_NAME, default) + def scan_snapshot_id(self, default=None): + return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default) + def source_split_target_size(self, default=None): return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, default).get_bytes() diff --git a/paimon-python/pypaimon/ray/ray_paimon.py b/paimon-python/pypaimon/ray/ray_paimon.py index 5ea2d21096f4..c2fcd30c41d0 100644 --- a/paimon-python/pypaimon/ray/ray_paimon.py +++ b/paimon-python/pypaimon/ray/ray_paimon.py @@ -40,6 +40,8 @@ def read_paimon( filter: Optional[Predicate] = None, projection: Optional[List[str]] = None, limit: Optional[int] = None, + snapshot_id: Optional[int] = None, + tag_name: Optional[str] = None, ray_remote_args: Optional[Dict[str, Any]] = None, concurrency: Optional[int] = None, override_num_blocks: Optional[int] = None, @@ -54,6 +56,10 @@ def read_paimon( filter: Optional predicate to push down into the scan. projection: Optional list of column names to read. limit: Optional row limit for the scan. + snapshot_id: Optional snapshot id to time-travel to. Mutually + exclusive with ``tag_name``. + tag_name: Optional tag name to time-travel to. Mutually + exclusive with ``snapshot_id``. ray_remote_args: Optional kwargs passed to ``ray.remote`` in read tasks. concurrency: Optional max number of Ray read tasks to run concurrently. override_num_blocks: Optional override for the number of output blocks. @@ -65,6 +71,11 @@ def read_paimon( from pypaimon.read.datasource.ray_datasource import RayDatasource from pypaimon.read.datasource.split_provider import CatalogSplitProvider + if snapshot_id is not None and tag_name is not None: + raise ValueError( + "snapshot_id and tag_name cannot be set at the same time" + ) + if override_num_blocks is not None and override_num_blocks < 1: raise ValueError( "override_num_blocks must be at least 1, got {}".format(override_num_blocks) @@ -77,6 +88,8 @@ def read_paimon( predicate=filter, projection=projection, limit=limit, + snapshot_id=snapshot_id, + tag_name=tag_name, ) ) return ray.data.read_datasource( diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py b/paimon-python/pypaimon/read/datasource/split_provider.py index 491e8127d2f0..22297cc89a1c 100644 --- a/paimon-python/pypaimon/read/datasource/split_provider.py +++ b/paimon-python/pypaimon/read/datasource/split_provider.py @@ -74,16 +74,24 @@ def __init__( predicate=None, projection: Optional[List[str]] = None, limit: Optional[int] = None, + snapshot_id: Optional[int] = None, + tag_name: Optional[str] = None, ): if not table_identifier: raise ValueError("table_identifier is required") if catalog_options is None: raise ValueError("catalog_options is required") + if snapshot_id is not None and tag_name is not None: + raise ValueError( + "snapshot_id and tag_name cannot be set at the same time" + ) self._table_identifier = table_identifier self._catalog_options = catalog_options self._predicate = predicate self._projection = projection self._limit = limit + self._snapshot_id = snapshot_id + self._tag_name = tag_name self._table_cached = None self._splits_cached = None self._read_type_cached = None @@ -92,7 +100,15 @@ def _ensure_table(self): if self._table_cached is None: from pypaimon.catalog.catalog_factory import CatalogFactory catalog = CatalogFactory.create(self._catalog_options) - self._table_cached = catalog.get_table(self._table_identifier) + table = catalog.get_table(self._table_identifier) + travel_options = {} + if self._snapshot_id is not None: + travel_options["scan.snapshot-id"] = str(self._snapshot_id) + if self._tag_name is not None: + travel_options["scan.tag-name"] = self._tag_name + if travel_options: + table = table.copy(travel_options) + self._table_cached = table return self._table_cached def _ensure_planned(self): diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 562bea26f596..9b661b295a63 100755 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -113,6 +113,23 @@ def tag_manifest_scanner(): self.predicate, self.limit ) + elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle snapshot-id-based reading + snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID)) + + def snapshot_id_manifest_scanner(): + snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id) + if snapshot is None: + raise ValueError( + "Snapshot id %d does not exist" % snapshot_id + ) + return manifest_list_manager.read_all(snapshot), snapshot + + return FileScanner( + self.table, + snapshot_id_manifest_scanner, + self.predicate, + self.limit + ) def all_manifests(): snapshot = snapshot_manager.get_latest_snapshot() diff --git a/paimon-python/pypaimon/snapshot/time_travel_util.py b/paimon-python/pypaimon/snapshot/time_travel_util.py index df3eeb11cb88..4f9e4aa0657d 100644 --- a/paimon-python/pypaimon/snapshot/time_travel_util.py +++ b/paimon-python/pypaimon/snapshot/time_travel_util.py @@ -25,7 +25,8 @@ from pypaimon.tag.tag_manager import TagManager SCAN_KEYS = [ - CoreOptions.SCAN_TAG_NAME.key() + CoreOptions.SCAN_TAG_NAME.key(), + CoreOptions.SCAN_SNAPSHOT_ID.key(), ] @@ -35,21 +36,25 @@ class TimeTravelUtil: @staticmethod def try_travel_to_snapshot( options: Options, - tag_manager: TagManager + tag_manager: TagManager, + snapshot_manager=None, ) -> Optional[Snapshot]: """ Try to travel to a snapshot based on the options. - + Supports the following time travel options: - scan.tag-name: Travel to a specific tag - + - scan.snapshot-id: Travel to a specific snapshot id + Args: options: The options containing time travel parameters tag_manager: The tag manager - + snapshot_manager: The snapshot manager, required when + ``scan.snapshot-id`` is set + Returns: The Snapshot to travel to, or None if no time travel option is set. - + Raises: ValueError: If more than one time travel option is set """ @@ -71,5 +76,15 @@ def try_travel_to_snapshot( if tag is None: raise ValueError(f"Tag '{tag_name}' doesn't exist.") return tag.trim_to_snapshot() + elif key == CoreOptions.SCAN_SNAPSHOT_ID.key(): + if snapshot_manager is None: + raise ValueError( + "snapshot_manager is required to resolve scan.snapshot-id" + ) + snapshot_id = int(core_options.scan_snapshot_id()) + snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id) + if snapshot is None: + raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.") + return snapshot else: raise ValueError(f"Unsupported time travel mode: {key}") diff --git a/paimon-python/pypaimon/table/file_store_table.py b/paimon-python/pypaimon/table/file_store_table.py index 35addad2518d..d7c80b016d33 100644 --- a/paimon-python/pypaimon/table/file_store_table.py +++ b/paimon-python/pypaimon/table/file_store_table.py @@ -435,6 +435,7 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]: Supports the following time travel options: - scan.tag-name: Travel to a specific tag + - scan.snapshot-id: Travel to a specific snapshot id Returns: The TableSchema at the time travel point, or None if no time travel option is set. @@ -442,7 +443,9 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]: try: from pypaimon.snapshot.time_travel_util import TimeTravelUtil - snapshot = TimeTravelUtil.try_travel_to_snapshot(options, self.tag_manager()) + snapshot = TimeTravelUtil.try_travel_to_snapshot( + options, self.tag_manager(), self.snapshot_manager() + ) if snapshot is None: return None return self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map()) diff --git a/paimon-python/pypaimon/table/source/full_text_scan.py b/paimon-python/pypaimon/table/source/full_text_scan.py index 50b2381d2778..e3cf312bf47e 100644 --- a/paimon-python/pypaimon/table/source/full_text_scan.py +++ b/paimon-python/pypaimon/table/source/full_text_scan.py @@ -61,7 +61,8 @@ def scan(self) -> FullTextScanPlan: from pypaimon.common.options.options import Options travel_snapshot = TimeTravelUtil.try_travel_to_snapshot( Options(self._table.table_schema.options), - self._table.tag_manager() + self._table.tag_manager(), + self._table.snapshot_manager(), ) if travel_snapshot is not None: snapshot = travel_snapshot diff --git a/paimon-python/pypaimon/table/source/vector_search_scan.py b/paimon-python/pypaimon/table/source/vector_search_scan.py index 035e51a846cd..0285e3374e80 100644 --- a/paimon-python/pypaimon/table/source/vector_search_scan.py +++ b/paimon-python/pypaimon/table/source/vector_search_scan.py @@ -77,7 +77,8 @@ def scan(self): snapshot = TimeTravelUtil.try_travel_to_snapshot( Options(self._table.table_schema.options), - self._table.tag_manager() + self._table.tag_manager(), + self._table.snapshot_manager(), ) if snapshot is None: snapshot = self._table.snapshot_manager().get_latest_snapshot() diff --git a/paimon-python/pypaimon/tests/ray_integration_test.py b/paimon-python/pypaimon/tests/ray_integration_test.py index 1b8e2df5057d..4d9613c4994d 100644 --- a/paimon-python/pypaimon/tests/ray_integration_test.py +++ b/paimon-python/pypaimon/tests/ray_integration_test.py @@ -196,6 +196,65 @@ def test_read_paimon_empty_table(self): ds = read_paimon(identifier, self.catalog_options) self.assertEqual(ds.count(), 0) + def test_read_paimon_with_snapshot_id(self): + """read_paimon(snapshot_id=N) time-travels to that snapshot.""" + from pypaimon.ray import read_paimon + + pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + identifier = 'default.test_read_snap_id' + catalog = CatalogFactory.create(self.catalog_options) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table(identifier, schema, False) + table = catalog.get_table(identifier) + for batch in [{'id': [1], 'name': ['a']}, {'id': [2], 'name': ['b']}]: + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + + ds_latest = read_paimon(identifier, self.catalog_options) + self.assertEqual(ds_latest.count(), 2) + + ds_snap1 = read_paimon(identifier, self.catalog_options, snapshot_id=1) + self.assertEqual(ds_snap1.count(), 1) + self.assertEqual(ds_snap1.to_pandas()['id'].tolist(), [1]) + + def test_read_paimon_with_tag_name(self): + """read_paimon(tag_name=...) time-travels to a tagged snapshot.""" + from pypaimon.ray import read_paimon + + pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + identifier = 'default.test_read_tag_name' + catalog = CatalogFactory.create(self.catalog_options) + schema = Schema.from_pyarrow_schema(pa_schema) + catalog.create_table(identifier, schema, False) + table = catalog.get_table(identifier) + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict({'id': [1], 'name': ['a']}, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + table.create_tag('v1') + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict({'id': [2], 'name': ['b']}, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + + ds_tag = read_paimon(identifier, self.catalog_options, tag_name='v1') + self.assertEqual(ds_tag.count(), 1) + self.assertEqual(ds_tag.to_pandas()['id'].tolist(), [1]) + + def test_read_paimon_rejects_snapshot_id_and_tag_name_together(self): + from pypaimon.ray import read_paimon + + with self.assertRaises(ValueError): + read_paimon( + 'default.dummy', self.catalog_options, + snapshot_id=1, tag_name='v1', + ) + def test_write_paimon_basic(self): """write_paimon() writes data that read_paimon() can round-trip.""" from pypaimon.ray import read_paimon, write_paimon diff --git a/paimon-python/pypaimon/tests/split_provider_test.py b/paimon-python/pypaimon/tests/split_provider_test.py index 31152f28a6d1..05bedd2f4173 100644 --- a/paimon-python/pypaimon/tests/split_provider_test.py +++ b/paimon-python/pypaimon/tests/split_provider_test.py @@ -156,6 +156,78 @@ def test_catalog_provider_requires_identifier_and_options(self): table_identifier=self.identifier, catalog_options=None ) + def test_catalog_provider_rejects_snapshot_id_and_tag_name_together(self): + with self.assertRaises(ValueError): + CatalogSplitProvider( + table_identifier=self.identifier, + catalog_options=self.catalog_options, + snapshot_id=1, + tag_name='v1', + ) + + def test_catalog_provider_time_travel_by_snapshot_id(self): + """Two commits → snapshot_id=1 sees only the first commit's rows.""" + pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + identifier = 'default.split_provider_snap_id' + schema = Schema.from_pyarrow_schema(pa_schema) + catalog = CatalogFactory.create(self.catalog_options) + catalog.create_table(identifier, schema, False) + table = catalog.get_table(identifier) + for batch in [{'id': [10], 'name': ['a']}, {'id': [20], 'name': ['b']}]: + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + + provider = CatalogSplitProvider( + table_identifier=identifier, + catalog_options=self.catalog_options, + snapshot_id=1, + ) + from pypaimon.read.table_read import TableRead + tr = TableRead( + provider.table(), + predicate=None, + read_type=provider.read_type(), + ) + rows = tr.to_arrow(provider.splits()).to_pylist() + self.assertEqual([r['id'] for r in rows], [10]) + + def test_catalog_provider_time_travel_by_tag_name(self): + """Tag captures snapshot 1; reading via tag returns only that snapshot's rows.""" + pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())]) + identifier = 'default.split_provider_tag_name' + schema = Schema.from_pyarrow_schema(pa_schema) + catalog = CatalogFactory.create(self.catalog_options) + catalog.create_table(identifier, schema, False) + table = catalog.get_table(identifier) + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict({'id': [11], 'name': ['x']}, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + table.create_tag('v1') + wb = table.new_batch_write_builder() + writer = wb.new_write() + writer.write_arrow(pa.Table.from_pydict({'id': [22], 'name': ['y']}, schema=pa_schema)) + wb.new_commit().commit(writer.prepare_commit()) + writer.close() + + provider = CatalogSplitProvider( + table_identifier=identifier, + catalog_options=self.catalog_options, + tag_name='v1', + ) + from pypaimon.read.table_read import TableRead + tr = TableRead( + provider.table(), + predicate=None, + read_type=provider.read_type(), + ) + rows = tr.to_arrow(provider.splits()).to_pylist() + self.assertEqual([r['id'] for r in rows], [11]) + def test_pre_resolved_provider_returns_inputs(self): """PreResolvedSplitProvider just hands back what it was given.""" catalog = CatalogFactory.create(self.catalog_options) diff --git a/paimon-python/pypaimon/tests/time_travel_util_test.py b/paimon-python/pypaimon/tests/time_travel_util_test.py new file mode 100644 index 000000000000..55bbd18ce5a3 --- /dev/null +++ b/paimon-python/pypaimon/tests/time_travel_util_test.py @@ -0,0 +1,98 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest + +from pypaimon.common.options.options import Options +from pypaimon.snapshot.time_travel_util import SCAN_KEYS, TimeTravelUtil + + +class _StubSnapshot: + def __init__(self, snapshot_id, schema_id=0): + self.id = snapshot_id + self.schema_id = schema_id + + +class _StubSnapshotManager: + def __init__(self, snapshots): + self._snapshots = {s.id: s for s in snapshots} + + def get_snapshot_by_id(self, snapshot_id): + return self._snapshots.get(snapshot_id) + + +class _StubTagManager: + def __init__(self, tags): + self._tags = tags + + def get(self, name): + return self._tags.get(name) + + +class TimeTravelUtilTest(unittest.TestCase): + + def test_returns_none_when_no_scan_option_set(self): + result = TimeTravelUtil.try_travel_to_snapshot( + Options({}), _StubTagManager({}), _StubSnapshotManager([]) + ) + self.assertIsNone(result) + + def test_resolves_snapshot_id(self): + snap2 = _StubSnapshot(2) + result = TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.snapshot-id': '2'}), + _StubTagManager({}), + _StubSnapshotManager([_StubSnapshot(1), snap2]), + ) + self.assertIs(result, snap2) + + def test_unknown_snapshot_id_raises(self): + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.snapshot-id': '99'}), + _StubTagManager({}), + _StubSnapshotManager([_StubSnapshot(1)]), + ) + + def test_snapshot_id_without_snapshot_manager_raises(self): + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.snapshot-id': '1'}), + _StubTagManager({}), + None, + ) + + def test_rejects_setting_snapshot_id_and_tag_name_together(self): + # Defence-in-depth: even if a caller bypasses read_paimon's check, + # the util-level guard still complains. + with self.assertRaises(ValueError): + TimeTravelUtil.try_travel_to_snapshot( + Options({'scan.snapshot-id': '1', 'scan.tag-name': 'v1'}), + _StubTagManager({}), + _StubSnapshotManager([_StubSnapshot(1)]), + ) + + def test_scan_keys_contains_both_options(self): + # Sanity check: SCAN_KEYS must enumerate both time-travel modes, + # otherwise the mutual-exclusion guard above would not trigger. + self.assertIn('scan.snapshot-id', SCAN_KEYS) + self.assertIn('scan.tag-name', SCAN_KEYS) + + +if __name__ == '__main__': + unittest.main()