Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/content/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,37 @@ ray_dataset = read_paimon(
)
```

**Time travel:**

```python
# Read a specific snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
snapshot_id=42,
)

# Read a tagged snapshot.
ray_dataset = read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
tag_name="release-2026-04",
)
```

`snapshot_id` and `tag_name` are mutually exclusive.

**Parameters:**
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`,
e.g. `{"warehouse": "/path/to/warehouse"}`.
- `filter`: optional `Predicate` to push down into the scan.
- `projection`: optional list of column names to read.
- `limit`: optional row limit applied at scan planning time.
- `snapshot_id`: optional snapshot id to time-travel to. Mutually
exclusive with `tag_name`.
- `tag_name`: optional tag name to time-travel to. Mutually
exclusive with `snapshot_id`.
- `override_num_blocks`: optional override for the number of output blocks.
Must be `>= 1`.
- `ray_remote_args`: optional kwargs passed to `ray.remote()` in read tasks
Expand Down
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ class CoreOptions:
.with_description("Optional tag name used in case of 'from-snapshot' scan mode.")
)

SCAN_SNAPSHOT_ID: ConfigOption[int] = (
ConfigOptions.key("scan.snapshot-id")
.long_type()
.no_default_value()
.with_description(
"Optional snapshot id used in case of 'from-snapshot' or "
"'from-snapshot-full' scan mode."
)
)

SOURCE_SPLIT_TARGET_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("source.split.target-size")
.memory_type()
Expand Down Expand Up @@ -568,6 +578,9 @@ def incremental_between_timestamp(self, default=None):
def scan_tag_name(self, default=None):
return self.options.get(CoreOptions.SCAN_TAG_NAME, default)

def scan_snapshot_id(self, default=None):
return self.options.get(CoreOptions.SCAN_SNAPSHOT_ID, default)

def source_split_target_size(self, default=None):
return self.options.get(CoreOptions.SOURCE_SPLIT_TARGET_SIZE, default).get_bytes()

Expand Down
13 changes: 13 additions & 0 deletions paimon-python/pypaimon/ray/ray_paimon.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def read_paimon(
filter: Optional[Predicate] = None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand All @@ -54,6 +56,10 @@ def read_paimon(
filter: Optional predicate to push down into the scan.
projection: Optional list of column names to read.
limit: Optional row limit for the scan.
snapshot_id: Optional snapshot id to time-travel to. Mutually
exclusive with ``tag_name``.
tag_name: Optional tag name to time-travel to. Mutually
exclusive with ``snapshot_id``.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in read tasks.
concurrency: Optional max number of Ray read tasks to run concurrently.
override_num_blocks: Optional override for the number of output blocks.
Expand All @@ -65,6 +71,11 @@ def read_paimon(
from pypaimon.read.datasource.ray_datasource import RayDatasource
from pypaimon.read.datasource.split_provider import CatalogSplitProvider

if snapshot_id is not None and tag_name is not None:
raise ValueError(
"snapshot_id and tag_name cannot be set at the same time"
)

if override_num_blocks is not None and override_num_blocks < 1:
raise ValueError(
"override_num_blocks must be at least 1, got {}".format(override_num_blocks)
Expand All @@ -77,6 +88,8 @@ def read_paimon(
predicate=filter,
projection=projection,
limit=limit,
snapshot_id=snapshot_id,
tag_name=tag_name,
)
)
return ray.data.read_datasource(
Expand Down
18 changes: 17 additions & 1 deletion paimon-python/pypaimon/read/datasource/split_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,24 @@ def __init__(
predicate=None,
projection: Optional[List[str]] = None,
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
):
if not table_identifier:
raise ValueError("table_identifier is required")
if catalog_options is None:
raise ValueError("catalog_options is required")
if snapshot_id is not None and tag_name is not None:
raise ValueError(
"snapshot_id and tag_name cannot be set at the same time"
)
self._table_identifier = table_identifier
self._catalog_options = catalog_options
self._predicate = predicate
self._projection = projection
self._limit = limit
self._snapshot_id = snapshot_id
self._tag_name = tag_name
self._table_cached = None
self._splits_cached = None
self._read_type_cached = None
Expand All @@ -92,7 +100,15 @@ def _ensure_table(self):
if self._table_cached is None:
from pypaimon.catalog.catalog_factory import CatalogFactory
catalog = CatalogFactory.create(self._catalog_options)
self._table_cached = catalog.get_table(self._table_identifier)
table = catalog.get_table(self._table_identifier)
travel_options = {}
if self._snapshot_id is not None:
travel_options["scan.snapshot-id"] = str(self._snapshot_id)
if self._tag_name is not None:
travel_options["scan.tag-name"] = self._tag_name
if travel_options:
table = table.copy(travel_options)
self._table_cached = table
return self._table_cached

def _ensure_planned(self):
Expand Down
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/read/table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ def tag_manifest_scanner():
self.predicate,
self.limit
)
elif options.contains(CoreOptions.SCAN_SNAPSHOT_ID): # Handle snapshot-id-based reading
snapshot_id = int(options.get(CoreOptions.SCAN_SNAPSHOT_ID))

def snapshot_id_manifest_scanner():
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:
raise ValueError(
"Snapshot id %d does not exist" % snapshot_id
)
return manifest_list_manager.read_all(snapshot), snapshot

return FileScanner(
self.table,
snapshot_id_manifest_scanner,
self.predicate,
self.limit
)

def all_manifests():
snapshot = snapshot_manager.get_latest_snapshot()
Expand Down
27 changes: 21 additions & 6 deletions paimon-python/pypaimon/snapshot/time_travel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from pypaimon.tag.tag_manager import TagManager

SCAN_KEYS = [
CoreOptions.SCAN_TAG_NAME.key()
CoreOptions.SCAN_TAG_NAME.key(),
CoreOptions.SCAN_SNAPSHOT_ID.key(),
]


Expand All @@ -35,21 +36,25 @@ class TimeTravelUtil:
@staticmethod
def try_travel_to_snapshot(
options: Options,
tag_manager: TagManager
tag_manager: TagManager,
snapshot_manager=None,
) -> Optional[Snapshot]:
"""
Try to travel to a snapshot based on the options.

Supports the following time travel options:
- scan.tag-name: Travel to a specific tag

- scan.snapshot-id: Travel to a specific snapshot id

Args:
options: The options containing time travel parameters
tag_manager: The tag manager

snapshot_manager: The snapshot manager, required when
``scan.snapshot-id`` is set

Returns:
The Snapshot to travel to, or None if no time travel option is set.

Raises:
ValueError: If more than one time travel option is set
"""
Expand All @@ -71,5 +76,15 @@ def try_travel_to_snapshot(
if tag is None:
raise ValueError(f"Tag '{tag_name}' doesn't exist.")
return tag.trim_to_snapshot()
elif key == CoreOptions.SCAN_SNAPSHOT_ID.key():
if snapshot_manager is None:
raise ValueError(
"snapshot_manager is required to resolve scan.snapshot-id"
)
snapshot_id = int(core_options.scan_snapshot_id())
snapshot = snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:
raise ValueError(f"Snapshot id '{snapshot_id}' doesn't exist.")
return snapshot
else:
raise ValueError(f"Unsupported time travel mode: {key}")
5 changes: 4 additions & 1 deletion paimon-python/pypaimon/table/file_store_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,14 +435,17 @@ def _try_time_travel(self, options: Options) -> Optional[TableSchema]:

Supports the following time travel options:
- scan.tag-name: Travel to a specific tag
- scan.snapshot-id: Travel to a specific snapshot id

Returns:
The TableSchema at the time travel point, or None if no time travel option is set.
"""

try:
from pypaimon.snapshot.time_travel_util import TimeTravelUtil
snapshot = TimeTravelUtil.try_travel_to_snapshot(options, self.tag_manager())
snapshot = TimeTravelUtil.try_travel_to_snapshot(
options, self.tag_manager(), self.snapshot_manager()
)
if snapshot is None:
return None
return self.schema_manager.get_schema(snapshot.schema_id).copy(new_options=options.to_map())
Expand Down
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/table/source/full_text_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def scan(self) -> FullTextScanPlan:
from pypaimon.common.options.options import Options
travel_snapshot = TimeTravelUtil.try_travel_to_snapshot(
Options(self._table.table_schema.options),
self._table.tag_manager()
self._table.tag_manager(),
self._table.snapshot_manager(),
)
if travel_snapshot is not None:
snapshot = travel_snapshot
Expand Down
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/table/source/vector_search_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ def scan(self):

snapshot = TimeTravelUtil.try_travel_to_snapshot(
Options(self._table.table_schema.options),
self._table.tag_manager()
self._table.tag_manager(),
self._table.snapshot_manager(),
)
if snapshot is None:
snapshot = self._table.snapshot_manager().get_latest_snapshot()
Expand Down
59 changes: 59 additions & 0 deletions paimon-python/pypaimon/tests/ray_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,65 @@ def test_read_paimon_empty_table(self):
ds = read_paimon(identifier, self.catalog_options)
self.assertEqual(ds.count(), 0)

def test_read_paimon_with_snapshot_id(self):
"""read_paimon(snapshot_id=N) time-travels to that snapshot."""
from pypaimon.ray import read_paimon

pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
identifier = 'default.test_read_snap_id'
catalog = CatalogFactory.create(self.catalog_options)
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table(identifier, schema, False)
table = catalog.get_table(identifier)
for batch in [{'id': [1], 'name': ['a']}, {'id': [2], 'name': ['b']}]:
wb = table.new_batch_write_builder()
writer = wb.new_write()
writer.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
wb.new_commit().commit(writer.prepare_commit())
writer.close()

ds_latest = read_paimon(identifier, self.catalog_options)
self.assertEqual(ds_latest.count(), 2)

ds_snap1 = read_paimon(identifier, self.catalog_options, snapshot_id=1)
self.assertEqual(ds_snap1.count(), 1)
self.assertEqual(ds_snap1.to_pandas()['id'].tolist(), [1])

def test_read_paimon_with_tag_name(self):
"""read_paimon(tag_name=...) time-travels to a tagged snapshot."""
from pypaimon.ray import read_paimon

pa_schema = pa.schema([('id', pa.int32()), ('name', pa.string())])
identifier = 'default.test_read_tag_name'
catalog = CatalogFactory.create(self.catalog_options)
schema = Schema.from_pyarrow_schema(pa_schema)
catalog.create_table(identifier, schema, False)
table = catalog.get_table(identifier)
wb = table.new_batch_write_builder()
writer = wb.new_write()
writer.write_arrow(pa.Table.from_pydict({'id': [1], 'name': ['a']}, schema=pa_schema))
wb.new_commit().commit(writer.prepare_commit())
writer.close()
table.create_tag('v1')
wb = table.new_batch_write_builder()
writer = wb.new_write()
writer.write_arrow(pa.Table.from_pydict({'id': [2], 'name': ['b']}, schema=pa_schema))
wb.new_commit().commit(writer.prepare_commit())
writer.close()

ds_tag = read_paimon(identifier, self.catalog_options, tag_name='v1')
self.assertEqual(ds_tag.count(), 1)
self.assertEqual(ds_tag.to_pandas()['id'].tolist(), [1])

def test_read_paimon_rejects_snapshot_id_and_tag_name_together(self):
from pypaimon.ray import read_paimon

with self.assertRaises(ValueError):
read_paimon(
'default.dummy', self.catalog_options,
snapshot_id=1, tag_name='v1',
)

def test_write_paimon_basic(self):
"""write_paimon() writes data that read_paimon() can round-trip."""
from pypaimon.ray import read_paimon, write_paimon
Expand Down
Loading
Loading