Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions paimon-python/pypaimon/read/reader/outer_projection_record_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""Outer-projection wrapper for nested-field reads.

Sits above a reader whose rows still carry full ROW sub-structures, and
emits flat rows whose slots are the values reached by walking each
nested name path. Used on the primary-key merge-read path: the inner
reader hands the merge function complete ROW columns (so deduplicate /
partial-update / aggregation see the original sub-structure), and this
wrapper extracts the user-visible flat columns afterwards.
"""

from typing import Any, List, Optional

from pypaimon.read.reader.iface.record_iterator import RecordIterator
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.table.row.internal_row import InternalRow
from pypaimon.table.row.offset_row import OffsetRow


class OuterProjectionRecordReader(RecordReader[InternalRow]):
"""Wraps an InternalRow reader and projects nested name paths into flat rows."""

def __init__(
self,
inner: RecordReader[InternalRow],
inner_top_names: List[str],
name_paths: List[List[str]],
):
if not name_paths:
raise ValueError("name_paths must be non-empty")
for path in name_paths:
if not path:
raise ValueError("each name path must contain at least one name")
name_to_top_idx = {name: i for i, name in enumerate(inner_top_names)}
self._specs: List[_PathSpec] = []
for path in name_paths:
top_name = path[0]
if top_name not in name_to_top_idx:
raise ValueError(
"path top-level field %r not found in inner row schema %r"
% (top_name, inner_top_names))
self._specs.append(_PathSpec(name_to_top_idx[top_name], list(path[1:])))
self._inner = inner
self._flat_arity = len(name_paths)

def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
inner_batch = self._inner.read_batch()
if inner_batch is None:
return None
return _OuterProjectionIterator(inner_batch, self._specs, self._flat_arity)

def close(self) -> None:
self._inner.close()


class _OuterProjectionIterator(RecordIterator[InternalRow]):
"""Per-batch iterator that materialises one flat OffsetRow per inner row."""

def __init__(
self,
inner: RecordIterator[InternalRow],
specs: List["_PathSpec"],
flat_arity: int,
):
self._inner = inner
self._specs = specs
self._flat_arity = flat_arity
self._reused_row = OffsetRow(None, 0, flat_arity)

def next(self) -> Optional[InternalRow]:
inner_row = self._inner.next()
if inner_row is None:
return None
flat = tuple(_extract(inner_row, spec) for spec in self._specs)
self._reused_row.replace(flat)
# Inherit the inner row's RowKind so downstream consumers (e.g. the
# to_arrow path) keep the same +I/-D/-U/+U classification.
self._reused_row.set_row_kind_byte(inner_row.get_row_kind().value)
return self._reused_row


class _PathSpec:
"""Pre-resolved name path: top-level slot index plus sub-field names."""

__slots__ = ("top_idx", "sub_names")

def __init__(self, top_idx: int, sub_names: List[str]):
self.top_idx = top_idx
self.sub_names = sub_names


def _extract(row: InternalRow, spec: _PathSpec) -> Any:
cur = row.get_field(spec.top_idx)
for name in spec.sub_names:
if cur is None:
return None
cur = _step_into(cur, name)
return cur


def _step_into(value: Any, name: str) -> Any:
"""Take one step into a ROW sub-structure by sub-field name.

Upstream materialises nested ROW values as plain Python dicts (e.g.
polars row-by-row iteration produces a dict for each struct slot),
so dict access is the only supported form here. Anything else is
rejected loudly to surface schema/wiring mismatches early.
"""
if isinstance(value, dict):
return value.get(name)
if isinstance(value, InternalRow):
# Defensive: if the upstream reader handed us a wrapped sub-row,
# we cannot index it by name without its schema, so fail fast
# rather than guessing the slot.
raise TypeError(
"Cannot step into InternalRow by name %r without sub-schema; "
"expected a dict from the polars row materialisation" % (name,))
raise TypeError(
"Cannot index nested ROW step %r into value of type %s"
% (name, type(value).__name__))
44 changes: 39 additions & 5 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
nested_path_by_name = self._nested_path_by_name()
has_nested = nested_path_by_name is not None

# Cover both the merge-internal aliases (``_KEY_id``) and the
# bare user-facing PK name (``id``) the file actually stores.
name_to_field: Dict[str, DataField] = {f.name: f for f in self.read_fields}
Comment thread
TheR1sing3un marked this conversation as resolved.
_, _trimmed_lookup_fields = self._get_trimmed_fields(
self._get_read_data_fields(), self._get_all_data_fields()
)
for f in _trimmed_lookup_fields:
name_to_field.setdefault(f.name, f)

format_reader: RecordBatchReader
if file_format == CoreOptions.FILE_FORMAT_AVRO:
avro_nested_paths = (
Expand All @@ -210,7 +219,6 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
if has_nested:
raise NotImplementedError(
"Nested-field projection is not supported on Lance files")
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
format_reader = FormatLanceReader(self.table.file_io, file_path, ordered_read_fields,
read_arrow_predicate, batch_size=batch_size,
Expand All @@ -219,15 +227,13 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
if has_nested:
raise NotImplementedError(
"Nested-field projection is not supported on Vortex files")
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
predicate_fields = _get_all_fields(self.push_down_predicate) if self.push_down_predicate else set()
format_reader = FormatVortexReader(self.table.file_io, file_path, ordered_read_fields,
read_arrow_predicate, batch_size=batch_size,
row_indices=row_indices,
predicate_fields=predicate_fields)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
ordered_nested_paths = (
[nested_path_by_name[f.name] for f in ordered_read_fields]
Expand Down Expand Up @@ -560,6 +566,27 @@ def _get_all_data_fields(self):


class MergeFileSplitRead(SplitRead):
def __init__(
self,
table,
predicate: Optional[Predicate],
read_type: List[DataField],
split: Split,
row_tracking_enabled: bool,
outer_extract_name_paths: Optional[List[List[str]]] = None):
# Merge functions need full ROW sub-structures, so nested paths
# are not pushed down here; sub-path extraction happens above
# the merge via OuterProjectionRecordReader.
super().__init__(
table=table,
predicate=predicate,
read_type=read_type,
split=split,
row_tracking_enabled=row_tracking_enabled,
nested_name_paths=None,
)
self.outer_extract_name_paths = outer_extract_name_paths

def kv_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> RecordReader:
file_batch_reader = self.file_reader_supplier(file, True, self._get_final_read_data_fields(), False)
dv = dv_factory() if dv_factory else None
Expand Down Expand Up @@ -591,9 +618,16 @@ def create_reader(self) -> RecordReader:
concat_reader = ConcatRecordReader(section_readers)
kv_unwrap_reader = KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader))
if self.predicate_for_reader:
return FilterRecordReader(kv_unwrap_reader, self.predicate_for_reader)
reader = FilterRecordReader(kv_unwrap_reader, self.predicate_for_reader)
else:
return kv_unwrap_reader
reader = kv_unwrap_reader
if self.outer_extract_name_paths:
from pypaimon.read.reader.outer_projection_record_reader import \
OuterProjectionRecordReader
inner_top_names = [f.name for f in self.read_fields[-self.value_arity:]]
reader = OuterProjectionRecordReader(
reader, inner_top_names, self.outer_extract_name_paths)
return reader

def _get_all_data_fields(self):
return self._create_key_value_fields(self.table.fields)
Expand Down
32 changes: 27 additions & 5 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,21 @@ def to_torch(

def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
inner_read_type = self.read_type
outer_extract_name_paths: Optional[List[List[str]]] = None
if self.nested_name_paths and any(
len(p) > 1 for p in self.nested_name_paths):
raise NotImplementedError(
"Nested-field projection on primary-key tables that "
"require a merge read is not yet supported")
# Inner: full ROW for the merge function. Outer: extract
# the requested sub-paths back to the user's flat schema.
inner_read_type = self._widen_to_top_level_for_merge()
outer_extract_name_paths = self.nested_name_paths
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
read_type=inner_read_type,
split=split,
row_tracking_enabled=False
row_tracking_enabled=False,
outer_extract_name_paths=outer_extract_name_paths,
)
elif self.table.options.data_evolution_enabled():
if self.nested_name_paths and any(
Expand All @@ -306,6 +310,24 @@ def _create_split_read(self, split: Split) -> SplitRead:
nested_name_paths=self.nested_name_paths,
)

def _widen_to_top_level_for_merge(self) -> List[DataField]:
"""Unique top-level fields from ``self.nested_name_paths``, in path order."""
table_fields_by_name = {f.name: f for f in self.table.fields}
seen = set()
widened: List[DataField] = []
for path in self.nested_name_paths or []:
top_name = path[0]
if top_name in seen:
continue
seen.add(top_name)
field = table_fields_by_name.get(top_name)
if field is None:
raise ValueError(
"Nested projection top-level field %r not found in "
"table schema" % (top_name,))
widened.append(field)
return widened

@staticmethod
def convert_rows_to_arrow_batch(row_tuples: List[tuple], schema: pyarrow.Schema) -> pyarrow.RecordBatch:
columns_data = zip(*row_tuples)
Expand Down
24 changes: 24 additions & 0 deletions paimon-python/pypaimon/tests/reader_primary_key_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,30 @@ def test_pk_reader_with_projection(self):
expected = self.expected.select(['dt', 'user_id', 'behavior'])
self.assertEqual(actual, expected)

def test_pk_reader_with_projection_excluding_pk(self):
# Two commits force the split through the merge path. The merge
# reader still needs the PK column to assemble its key, even
# though the user-visible projection drops it — regress the
# case where narrowing to value-only fields broke the file
# column lookup.
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
primary_keys=['user_id', 'dt'],
options={'bucket': '2'})
self.catalog.create_table('default.test_pk_projection_no_pk', schema, False)
table = self.catalog.get_table('default.test_pk_projection_no_pk')
self._write_test_table(table)

read_builder = table.new_read_builder().with_projection(['behavior'])
actual = self._read_test_table(read_builder)
expected = self.expected.select(['behavior'])
# Projection drops PKs so we can only compare bag semantics.
self.assertEqual(
sorted([r['behavior'] for r in actual.to_pylist()],
key=lambda v: '' if v is None else v),
sorted([r['behavior'] for r in expected.to_pylist()],
key=lambda v: '' if v is None else v))

def test_pk_reader_with_limit(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
Expand Down
63 changes: 47 additions & 16 deletions paimon-python/pypaimon/tests/test_nested_projection_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,35 +175,66 @@ def test_avro_top_level_projection_unchanged(self):
{'val': 'y', 'id': 2},
{'val': 'z', 'id': 3}])

def test_pk_table_merge_split_with_nested_projection_raises(self):
# Phase 2b lands the append-only path only; PK + nested needs an
# outer-projection wrapper that ships in a follow-up commit. Until
# then, the call must refuse loudly rather than silently corrupt
# the merge function input. Two commits on the same PK force the
# split out of the raw-convertible fast path into the merge
# reader, which is where the guard lives.
identifier = 'default.pk_nested_unsupported'

class PrimaryKeyNestedTest(_AppendOnlyNestedBase):
"""PK tables go through the merge reader once a split is no longer
raw-convertible (multiple overlapping commits on the same key). The
merge function still needs full ROW sub-structures, so the read
splits inner = full-ROW from outer = flat sub-paths via an
OuterProjectionRecordReader."""

def _create_pk_table(self, name: str, file_format: str = 'parquet'):
identifier = 'default.{}'.format(name)
schema = Schema.from_pyarrow_schema(
self.pa_schema,
primary_keys=['id'],
options={'bucket': '1', 'file.format': 'parquet'},
options={'bucket': '1', 'file.format': file_format},
)
self.catalog.create_table(identifier, schema, False)
table = self.catalog.get_table(identifier)
for batch in (self.rows, self.rows): # two overlapping commits
# Two overlapping commits force the split off the raw-convertible
# fast path into the merge reader.
for batch in (self.rows, self.rows):
wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema))
wb.new_commit().commit(w.prepare_commit())
w.close()
return table

rb = table.new_read_builder().with_projection(['mv.latest_version'])
def _read_arrow(self, table, projection):
rb = table.new_read_builder().with_projection(projection)
splits = rb.new_scan().plan().splits()
# ``to_arrow`` materialises the split read; the merge path is what
# raises, so do it eagerly here rather than waiting for the first
# batch.
with self.assertRaises(NotImplementedError):
rb.new_read().to_arrow(splits)
return rb.new_read().to_arrow(splits)

def test_extracts_single_nested_leaf(self):
table = self._create_pk_table('pk_nested_single')
arrow = self._read_arrow(table, ['mv.latest_version'])
self.assertEqual(arrow.column_names, ['mv_latest_version'])
versions = sorted(arrow.column('mv_latest_version').to_pylist())
self.assertEqual(versions, [100, 200, 300])

def test_multiple_sub_paths_under_same_struct(self):
table = self._create_pk_table('pk_nested_double')
arrow = self._read_arrow(
table, ['mv.latest_version', 'mv.latest_value'])
self.assertEqual(arrow.column_names, ['mv_latest_version', 'mv_latest_value'])
pairs = sorted(zip(
arrow.column('mv_latest_version').to_pylist(),
arrow.column('mv_latest_value').to_pylist()))
self.assertEqual(pairs, [(100, 'a'), (200, 'b'), (300, 'c')])

def test_mixed_nested_and_top_level_preserves_order(self):
table = self._create_pk_table('pk_nested_mixed')
arrow = self._read_arrow(
table, ['id', 'mv.latest_version', 'val'])
self.assertEqual(
arrow.column_names, ['id', 'mv_latest_version', 'val'])
rows = sorted(zip(
arrow.column('id').to_pylist(),
arrow.column('mv_latest_version').to_pylist(),
arrow.column('val').to_pylist()))
self.assertEqual(rows, [(1, 100, 'x'), (2, 200, 'y'), (3, 300, 'z')])


if __name__ == '__main__':
Expand Down
Loading
Loading