diff --git a/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py new file mode 100644 index 000000000000..9f185d52926b --- /dev/null +++ b/paimon-python/pypaimon/read/reader/outer_projection_record_reader.py @@ -0,0 +1,137 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""Outer-projection wrapper for nested-field reads. + +Sits above a reader whose rows still carry full ROW sub-structures, and +emits flat rows whose slots are the values reached by walking each +nested name path. Used on the primary-key merge-read path: the inner +reader hands the merge function complete ROW columns (so deduplicate / +partial-update / aggregation see the original sub-structure), and this +wrapper extracts the user-visible flat columns afterwards. +""" + +from typing import Any, List, Optional + +from pypaimon.read.reader.iface.record_iterator import RecordIterator +from pypaimon.read.reader.iface.record_reader import RecordReader +from pypaimon.table.row.internal_row import InternalRow +from pypaimon.table.row.offset_row import OffsetRow + + +class OuterProjectionRecordReader(RecordReader[InternalRow]): + """Wraps an InternalRow reader and projects nested name paths into flat rows.""" + + def __init__( + self, + inner: RecordReader[InternalRow], + inner_top_names: List[str], + name_paths: List[List[str]], + ): + if not name_paths: + raise ValueError("name_paths must be non-empty") + for path in name_paths: + if not path: + raise ValueError("each name path must contain at least one name") + name_to_top_idx = {name: i for i, name in enumerate(inner_top_names)} + self._specs: List[_PathSpec] = [] + for path in name_paths: + top_name = path[0] + if top_name not in name_to_top_idx: + raise ValueError( + "path top-level field %r not found in inner row schema %r" + % (top_name, inner_top_names)) + self._specs.append(_PathSpec(name_to_top_idx[top_name], list(path[1:]))) + self._inner = inner + self._flat_arity = len(name_paths) + + def read_batch(self) -> Optional[RecordIterator[InternalRow]]: + inner_batch = self._inner.read_batch() + if inner_batch is None: + return None + return _OuterProjectionIterator(inner_batch, self._specs, self._flat_arity) + + def close(self) -> None: + self._inner.close() + + +class _OuterProjectionIterator(RecordIterator[InternalRow]): + """Per-batch iterator that materialises one flat OffsetRow per inner row.""" + + def __init__( + self, + inner: RecordIterator[InternalRow], + specs: List["_PathSpec"], + flat_arity: int, + ): + self._inner = inner + self._specs = specs + self._flat_arity = flat_arity + self._reused_row = OffsetRow(None, 0, flat_arity) + + def next(self) -> Optional[InternalRow]: + inner_row = self._inner.next() + if inner_row is None: + return None + flat = tuple(_extract(inner_row, spec) for spec in self._specs) + self._reused_row.replace(flat) + # Inherit the inner row's RowKind so downstream consumers (e.g. the + # to_arrow path) keep the same +I/-D/-U/+U classification. + self._reused_row.set_row_kind_byte(inner_row.get_row_kind().value) + return self._reused_row + + +class _PathSpec: + """Pre-resolved name path: top-level slot index plus sub-field names.""" + + __slots__ = ("top_idx", "sub_names") + + def __init__(self, top_idx: int, sub_names: List[str]): + self.top_idx = top_idx + self.sub_names = sub_names + + +def _extract(row: InternalRow, spec: _PathSpec) -> Any: + cur = row.get_field(spec.top_idx) + for name in spec.sub_names: + if cur is None: + return None + cur = _step_into(cur, name) + return cur + + +def _step_into(value: Any, name: str) -> Any: + """Take one step into a ROW sub-structure by sub-field name. + + Upstream materialises nested ROW values as plain Python dicts (e.g. + polars row-by-row iteration produces a dict for each struct slot), + so dict access is the only supported form here. Anything else is + rejected loudly to surface schema/wiring mismatches early. + """ + if isinstance(value, dict): + return value.get(name) + if isinstance(value, InternalRow): + # Defensive: if the upstream reader handed us a wrapped sub-row, + # we cannot index it by name without its schema, so fail fast + # rather than guessing the slot. + raise TypeError( + "Cannot step into InternalRow by name %r without sub-schema; " + "expected a dict from the polars row materialisation" % (name,)) + raise TypeError( + "Cannot index nested ROW step %r into value of type %s" + % (name, type(value).__name__)) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index e5903958d198..c5f4082b9d2e 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -189,15 +189,29 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, nested_path_by_name = self._nested_path_by_name() has_nested = nested_path_by_name is not None + # Cover both the merge-internal aliases (``_KEY_id``) and the + # bare user-facing PK name (``id``) the file actually stores. + name_to_field: Dict[str, DataField] = {f.name: f for f in self.read_fields} + _, _trimmed_lookup_fields = self._get_trimmed_fields( + self._get_read_data_fields(), self._get_all_data_fields() + ) + for f in _trimmed_lookup_fields: + name_to_field.setdefault(f.name, f) + format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: avro_nested_paths = ( [nested_path_by_name[name] for name in read_file_fields] if has_nested else None ) - format_reader = FormatAvroReader(self.table.file_io, file_path, read_file_fields, - self.read_fields, read_arrow_predicate, batch_size=batch_size, - nested_name_paths=avro_nested_paths) + # Pass the alias-safe union so FormatAvroReader can resolve + # the bare PK name (e.g. ``id``) requested by read_file_fields, + # even when value projection drops it from self.read_fields. + format_reader = FormatAvroReader( + self.table.file_io, file_path, read_file_fields, + list(name_to_field.values()), + read_arrow_predicate, batch_size=batch_size, + nested_name_paths=avro_nested_paths) elif file_format == CoreOptions.FILE_FORMAT_BLOB: if has_nested: raise NotImplementedError( @@ -210,7 +224,6 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, if has_nested: raise NotImplementedError( "Nested-field projection is not supported on Lance files") - name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] format_reader = FormatLanceReader(self.table.file_io, file_path, ordered_read_fields, read_arrow_predicate, batch_size=batch_size, @@ -219,7 +232,6 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, if has_nested: raise NotImplementedError( "Nested-field projection is not supported on Vortex files") - name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] predicate_fields = _get_all_fields(self.push_down_predicate) if self.push_down_predicate else set() format_reader = FormatVortexReader(self.table.file_io, file_path, ordered_read_fields, @@ -227,7 +239,6 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, row_indices=row_indices, predicate_fields=predicate_fields) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: - name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] ordered_nested_paths = ( [nested_path_by_name[f.name] for f in ordered_read_fields] @@ -560,6 +571,27 @@ def _get_all_data_fields(self): class MergeFileSplitRead(SplitRead): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + split: Split, + row_tracking_enabled: bool, + outer_extract_name_paths: Optional[List[List[str]]] = None): + # Merge functions need full ROW sub-structures, so nested paths + # are not pushed down here; sub-path extraction happens above + # the merge via OuterProjectionRecordReader. + super().__init__( + table=table, + predicate=predicate, + read_type=read_type, + split=split, + row_tracking_enabled=row_tracking_enabled, + nested_name_paths=None, + ) + self.outer_extract_name_paths = outer_extract_name_paths + def kv_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> RecordReader: file_batch_reader = self.file_reader_supplier(file, True, self._get_final_read_data_fields(), False) dv = dv_factory() if dv_factory else None @@ -591,9 +623,16 @@ def create_reader(self) -> RecordReader: concat_reader = ConcatRecordReader(section_readers) kv_unwrap_reader = KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader)) if self.predicate_for_reader: - return FilterRecordReader(kv_unwrap_reader, self.predicate_for_reader) + reader = FilterRecordReader(kv_unwrap_reader, self.predicate_for_reader) else: - return kv_unwrap_reader + reader = kv_unwrap_reader + if self.outer_extract_name_paths: + from pypaimon.read.reader.outer_projection_record_reader import \ + OuterProjectionRecordReader + inner_top_names = [f.name for f in self.read_fields[-self.value_arity:]] + reader = OuterProjectionRecordReader( + reader, inner_top_names, self.outer_extract_name_paths) + return reader def _get_all_data_fields(self): return self._create_key_value_fields(self.table.fields) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 488c964bcd00..ce874532b0b7 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -270,17 +270,21 @@ def to_torch( def _create_split_read(self, split: Split) -> SplitRead: if self.table.is_primary_key_table and not split.raw_convertible: + inner_read_type = self.read_type + outer_extract_name_paths: Optional[List[List[str]]] = None if self.nested_name_paths and any( len(p) > 1 for p in self.nested_name_paths): - raise NotImplementedError( - "Nested-field projection on primary-key tables that " - "require a merge read is not yet supported") + # Inner: full ROW for the merge function. Outer: extract + # the requested sub-paths back to the user's flat schema. + inner_read_type = self._widen_to_top_level_for_merge() + outer_extract_name_paths = self.nested_name_paths return MergeFileSplitRead( table=self.table, predicate=self.predicate, - read_type=self.read_type, + read_type=inner_read_type, split=split, - row_tracking_enabled=False + row_tracking_enabled=False, + outer_extract_name_paths=outer_extract_name_paths, ) elif self.table.options.data_evolution_enabled(): if self.nested_name_paths and any( @@ -306,6 +310,24 @@ def _create_split_read(self, split: Split) -> SplitRead: nested_name_paths=self.nested_name_paths, ) + def _widen_to_top_level_for_merge(self) -> List[DataField]: + """Unique top-level fields from ``self.nested_name_paths``, in path order.""" + table_fields_by_name = {f.name: f for f in self.table.fields} + seen = set() + widened: List[DataField] = [] + for path in self.nested_name_paths or []: + top_name = path[0] + if top_name in seen: + continue + seen.add(top_name) + field = table_fields_by_name.get(top_name) + if field is None: + raise ValueError( + "Nested projection top-level field %r not found in " + "table schema" % (top_name,)) + widened.append(field) + return widened + @staticmethod def convert_rows_to_arrow_batch(row_tuples: List[tuple], schema: pyarrow.Schema) -> pyarrow.RecordBatch: columns_data = zip(*row_tuples) diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py b/paimon-python/pypaimon/tests/reader_primary_key_test.py index 36ec678c8c15..1a443e375a2e 100644 --- a/paimon-python/pypaimon/tests/reader_primary_key_test.py +++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py @@ -270,6 +270,45 @@ def test_pk_reader_with_projection(self): expected = self.expected.select(['dt', 'user_id', 'behavior']) self.assertEqual(actual, expected) + def _assert_value_only_projection_works(self, file_format: str, table_suffix: str): + # Two commits force the split through the merge path. The merge + # reader still needs the PK column to assemble its key, even + # though the user-visible projection drops it — regress the + # case where narrowing to value-only fields broke the file + # column lookup. + schema = Schema.from_pyarrow_schema( + self.pa_schema, + partition_keys=['dt'], + primary_keys=['user_id', 'dt'], + options={'bucket': '2', 'file.format': file_format}) + self.catalog.create_table( + 'default.test_pk_projection_no_pk_' + table_suffix, schema, False) + table = self.catalog.get_table( + 'default.test_pk_projection_no_pk_' + table_suffix) + self._write_test_table(table) + + read_builder = table.new_read_builder().with_projection(['behavior']) + actual = self._read_test_table(read_builder) + expected = self.expected.select(['behavior']) + # Projection drops PKs so we can only compare bag semantics. + self.assertEqual( + sorted([r['behavior'] for r in actual.to_pylist()], + key=lambda v: '' if v is None else v), + sorted([r['behavior'] for r in expected.to_pylist()], + key=lambda v: '' if v is None else v)) + + def test_pk_reader_with_projection_excluding_pk(self): + self._assert_value_only_projection_works('parquet', 'parquet') + + def test_pk_reader_with_projection_excluding_pk_orc(self): + self._assert_value_only_projection_works('orc', 'orc') + + def test_pk_reader_with_projection_excluding_pk_avro(self): + # Avro path resolves DataField names through ``full_fields_map`` + # built from ``self.read_fields``; the alias-safe lookup must also + # cover the bare PK name (``user_id``) the file actually stores. + self._assert_value_only_projection_works('avro', 'avro') + def test_pk_reader_with_limit(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], diff --git a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py index 87ab3935f435..457f1b588ed0 100644 --- a/paimon-python/pypaimon/tests/test_nested_projection_e2e.py +++ b/paimon-python/pypaimon/tests/test_nested_projection_e2e.py @@ -175,35 +175,76 @@ def test_avro_top_level_projection_unchanged(self): {'val': 'y', 'id': 2}, {'val': 'z', 'id': 3}]) - def test_pk_table_merge_split_with_nested_projection_raises(self): - # Phase 2b lands the append-only path only; PK + nested needs an - # outer-projection wrapper that ships in a follow-up commit. Until - # then, the call must refuse loudly rather than silently corrupt - # the merge function input. Two commits on the same PK force the - # split out of the raw-convertible fast path into the merge - # reader, which is where the guard lives. - identifier = 'default.pk_nested_unsupported' + +class PrimaryKeyNestedTest(_AppendOnlyNestedBase): + """PK tables go through the merge reader once a split is no longer + raw-convertible (multiple overlapping commits on the same key). The + merge function still needs full ROW sub-structures, so the read + splits inner = full-ROW from outer = flat sub-paths via an + OuterProjectionRecordReader.""" + + def _create_pk_table(self, name: str, file_format: str = 'parquet'): + identifier = 'default.{}'.format(name) schema = Schema.from_pyarrow_schema( self.pa_schema, primary_keys=['id'], - options={'bucket': '1', 'file.format': 'parquet'}, + options={'bucket': '1', 'file.format': file_format}, ) self.catalog.create_table(identifier, schema, False) table = self.catalog.get_table(identifier) - for batch in (self.rows, self.rows): # two overlapping commits + # Two overlapping commits force the split off the raw-convertible + # fast path into the merge reader. + for batch in (self.rows, self.rows): wb = table.new_batch_write_builder() w = wb.new_write() w.write_arrow(pa.Table.from_pylist(batch, schema=self.pa_schema)) wb.new_commit().commit(w.prepare_commit()) w.close() + return table - rb = table.new_read_builder().with_projection(['mv.latest_version']) + def _read_arrow(self, table, projection): + rb = table.new_read_builder().with_projection(projection) splits = rb.new_scan().plan().splits() - # ``to_arrow`` materialises the split read; the merge path is what - # raises, so do it eagerly here rather than waiting for the first - # batch. - with self.assertRaises(NotImplementedError): - rb.new_read().to_arrow(splits) + return rb.new_read().to_arrow(splits) + + def test_extracts_single_nested_leaf(self): + table = self._create_pk_table('pk_nested_single') + arrow = self._read_arrow(table, ['mv.latest_version']) + self.assertEqual(arrow.column_names, ['mv_latest_version']) + versions = sorted(arrow.column('mv_latest_version').to_pylist()) + self.assertEqual(versions, [100, 200, 300]) + + def test_multiple_sub_paths_under_same_struct(self): + table = self._create_pk_table('pk_nested_double') + arrow = self._read_arrow( + table, ['mv.latest_version', 'mv.latest_value']) + self.assertEqual(arrow.column_names, ['mv_latest_version', 'mv_latest_value']) + pairs = sorted(zip( + arrow.column('mv_latest_version').to_pylist(), + arrow.column('mv_latest_value').to_pylist())) + self.assertEqual(pairs, [(100, 'a'), (200, 'b'), (300, 'c')]) + + def test_mixed_nested_and_top_level_preserves_order(self): + table = self._create_pk_table('pk_nested_mixed') + arrow = self._read_arrow( + table, ['id', 'mv.latest_version', 'val']) + self.assertEqual( + arrow.column_names, ['id', 'mv_latest_version', 'val']) + rows = sorted(zip( + arrow.column('id').to_pylist(), + arrow.column('mv_latest_version').to_pylist(), + arrow.column('val').to_pylist())) + self.assertEqual(rows, [(1, 100, 'x'), (2, 200, 'y'), (3, 300, 'z')]) + + def test_avro_extracts_single_nested_leaf(self): + # Avro PK reads resolve DataFields through ``full_fields_map`` which + # historically only covered merge-internal aliases; without the + # alias-safe fix, this projection would raise ``KeyError: 'id'``. + table = self._create_pk_table('pk_avro_nested_single', file_format='avro') + arrow = self._read_arrow(table, ['mv.latest_version']) + self.assertEqual(arrow.column_names, ['mv_latest_version']) + versions = sorted(arrow.column('mv_latest_version').to_pylist()) + self.assertEqual(versions, [100, 200, 300]) if __name__ == '__main__': diff --git a/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py new file mode 100644 index 000000000000..846e90e1b421 --- /dev/null +++ b/paimon-python/pypaimon/tests/test_outer_projection_record_reader.py @@ -0,0 +1,202 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import unittest +from typing import List, Optional + +from pypaimon.read.reader.iface.record_iterator import RecordIterator +from pypaimon.read.reader.iface.record_reader import RecordReader +from pypaimon.read.reader.outer_projection_record_reader import ( + OuterProjectionRecordReader, _extract, _PathSpec, _step_into) +from pypaimon.table.row.internal_row import InternalRow +from pypaimon.table.row.offset_row import OffsetRow + + +class _StaticIterator(RecordIterator[InternalRow]): + """Iterator over a pre-built list of OffsetRows; yields None when drained.""" + + def __init__(self, rows: List[OffsetRow]): + self._rows = list(rows) + self._idx = 0 + + def next(self) -> Optional[InternalRow]: + if self._idx >= len(self._rows): + return None + row = self._rows[self._idx] + self._idx += 1 + return row + + +class _StaticReader(RecordReader[InternalRow]): + """Reader that emits a single batch then EOF, with close-tracking.""" + + def __init__(self, rows: List[OffsetRow]): + self._rows = rows + self._delivered = False + self.closed = False + + def read_batch(self) -> Optional[RecordIterator[InternalRow]]: + if self._delivered: + return None + self._delivered = True + return _StaticIterator(self._rows) + + def close(self) -> None: + self.closed = True + + +def _row(*values) -> OffsetRow: + return OffsetRow(tuple(values), 0, len(values)) + + +class StepIntoTest(unittest.TestCase): + + def test_dict_lookup(self): + self.assertEqual(_step_into({'a': 1, 'b': 2}, 'a'), 1) + self.assertIsNone(_step_into({'a': 1}, 'missing')) + + def test_internal_row_rejected(self): + # Defensive: we never expect a nested InternalRow in the polars path. + row = OffsetRow((10, 20), 0, 2) + with self.assertRaises(TypeError): + _step_into(row, 'whatever') + + def test_unsupported_type_raises(self): + with self.assertRaises(TypeError): + _step_into(42, 'name') + + +class ExtractTest(unittest.TestCase): + + def test_empty_sub_names_returns_top_level_field(self): + # Empty sub_names → return the top-level slot itself (used when + # mixing top-level and nested paths in the same projection). + row = _row(1, {'v': 100, 's': 'a'}) + spec = _PathSpec(top_idx=0, sub_names=[]) + self.assertEqual(_extract(row, spec), 1) + + def test_nested_walk_returns_leaf(self): + row = _row(1, {'v': 100, 's': 'a'}) + spec = _PathSpec(top_idx=1, sub_names=['v']) + self.assertEqual(_extract(row, spec), 100) + + def test_none_at_top_short_circuits(self): + row = _row(1, None) + spec = _PathSpec(top_idx=1, sub_names=['v']) + self.assertIsNone(_extract(row, spec)) + + def test_missing_sub_name_returns_none(self): + row = _row(1, {'v': 100}) + spec = _PathSpec(top_idx=1, sub_names=['s']) + self.assertIsNone(_extract(row, spec)) + + +class OuterProjectionRecordReaderTest(unittest.TestCase): + + def _build_reader(self, rows, top_names, name_paths): + return OuterProjectionRecordReader( + _StaticReader(rows), top_names, name_paths) + + def test_extracts_nested_leaf(self): + rows = [ + _row(1, {'v': 100, 's': 'a'}, 'x'), + _row(2, {'v': 200, 's': 'b'}, 'y'), + ] + reader = self._build_reader( + rows, + top_names=['id', 'mv', 'val'], + name_paths=[['mv', 'v']]) + batch = reader.read_batch() + out = [] + while True: + r = batch.next() + if r is None: + break + out.append(tuple(r.get_field(i) for i in range(len(r)))) + self.assertEqual(out, [(100,), (200,)]) + + def test_mixed_top_level_and_nested_preserves_order(self): + rows = [_row(1, {'v': 100, 's': 'a'}, 'x')] + reader = self._build_reader( + rows, + top_names=['id', 'mv', 'val'], + name_paths=[['val'], ['mv', 'v'], ['id']]) + batch = reader.read_batch() + r = batch.next() + self.assertEqual( + tuple(r.get_field(i) for i in range(len(r))), + ('x', 100, 1)) + + def test_nullable_struct_returns_none(self): + # The wrapper reuses a single OffsetRow per batch, so consumers must + # materialise each row before advancing — same contract as + # InternalRowWrapperIterator. + rows = [_row(1, None, 'x'), _row(2, {'v': 200, 's': 'b'}, 'y')] + reader = self._build_reader( + rows, + top_names=['id', 'mv', 'val'], + name_paths=[['mv', 'v']]) + batch = reader.read_batch() + first_value = batch.next().get_field(0) + second_value = batch.next().get_field(0) + self.assertIsNone(first_value) + self.assertEqual(second_value, 200) + + def test_inherits_row_kind(self): + rows = [_row(1, {'v': 100, 's': 'a'})] + rows[0].set_row_kind_byte(2) # -U + reader = self._build_reader( + rows, + top_names=['id', 'mv'], + name_paths=[['mv', 'v']]) + batch = reader.read_batch() + r = batch.next() + self.assertEqual(r.row_kind_byte, 2) + + def test_eof_returns_none(self): + reader = self._build_reader( + [], top_names=['id'], name_paths=[['id']]) + # First batch is empty (delivered) but the iterator immediately yields None. + first_batch = reader.read_batch() + self.assertIsNone(first_batch.next()) + # Subsequent read_batch returns None once the inner reader is drained. + self.assertIsNone(reader.read_batch()) + + def test_close_propagates(self): + inner = _StaticReader([_row(1, {'v': 100})]) + reader = OuterProjectionRecordReader( + inner, ['id', 'mv'], [['mv', 'v']]) + reader.close() + self.assertTrue(inner.closed) + + def test_unknown_top_name_raises_at_construction(self): + with self.assertRaises(ValueError): + OuterProjectionRecordReader( + _StaticReader([]), ['id', 'mv'], [['nope', 'v']]) + + def test_empty_name_paths_rejected(self): + with self.assertRaises(ValueError): + OuterProjectionRecordReader(_StaticReader([]), ['id'], []) + + def test_empty_individual_path_rejected(self): + with self.assertRaises(ValueError): + OuterProjectionRecordReader(_StaticReader([]), ['id'], [[]]) + + +if __name__ == '__main__': + unittest.main()