diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 682728f9a081..0c8af93713b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -27,11 +27,14 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.Collections.singletonList; import static org.apache.paimon.schema.SchemaEvolutionUtil.createIndexCastMapping; @@ -44,7 +47,7 @@ public class SimpleStatsEvolutions { private final long tableSchemaId; private final List tableDataFields; private final AtomicReference> tableFields; - private final ConcurrentMap evolutions; + private final ConcurrentMap evolutions; public SimpleStatsEvolutions(Function> schemaFields, long tableSchemaId) { this.schemaFields = schemaFields; @@ -55,20 +58,37 @@ public SimpleStatsEvolutions(Function> schemaFields, long } public SimpleStatsEvolution getOrCreate(long dataSchemaId) { + return getOrCreate(dataSchemaId, null); + } + + public SimpleStatsEvolution getOrCreate(long dataSchemaId, @Nullable List writeCols) { + EvolutionKey key = new EvolutionKey(dataSchemaId, writeCols); return evolutions.computeIfAbsent( - dataSchemaId, - id -> { - if (tableSchemaId == id) { + key, + k -> { + if (tableSchemaId == k.schemaId && k.writeCols == null) { return new SimpleStatsEvolution( - new RowType(schemaFields.apply(id)), null, null); + new RowType(schemaFields.apply(k.schemaId)), null, null); } // Get atomic schema fields. List schemaTableFields = tableFields.updateAndGet(v -> v == null ? tableDataFields : v); - List dataFields = schemaFields.apply(id); + List dataFields = schemaFields.apply(k.schemaId); + + // Project data fields to write cols for data evolution table + if (k.writeCols != null) { + RowType rowType = new RowType(dataFields); + // writeCols may contain some metadata fields i.e. row_id & max_seq + dataFields = + rowType.project( + k.writeCols.stream() + .filter(rowType::containsField) + .collect(Collectors.toList())) + .getFields(); + } IndexCastMapping indexCastMapping = - createIndexCastMapping(schemaTableFields, schemaFields.apply(id)); + createIndexCastMapping(schemaTableFields, dataFields); @Nullable int[] indexMapping = indexCastMapping.getIndexMapping(); // Create col stats array serializer with schema evolution return new SimpleStatsEvolution( @@ -127,4 +147,36 @@ public Predicate filterUnsafeFilter( public List tableDataFields() { return tableDataFields; } + + /** Immutable key for StatsEvolution. */ + private static class EvolutionKey { + + private final long schemaId; + @Nullable private final List writeCols; + + private EvolutionKey(long schemaId, @Nullable List writeCols) { + this.schemaId = schemaId; + this.writeCols = + writeCols == null + ? null + : Collections.unmodifiableList(new ArrayList<>(writeCols)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + EvolutionKey that = (EvolutionKey) o; + return schemaId == that.schemaId && Objects.equals(writeCols, that.writeCols); + } + + @Override + public int hashCode() { + return Objects.hash(schemaId, writeCols); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0c7eadb7ae62..56d3e0836b7e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -462,7 +462,8 @@ private StatsLazyGetter(DataFileMeta file, SimpleStatsEvolutions simpleStatsEvol } private void initialize() { - SimpleStatsEvolution evolution = simpleStatsEvolutions.getOrCreate(file.schemaId()); + SimpleStatsEvolution evolution = + simpleStatsEvolutions.getOrCreate(file.schemaId(), file.writeCols()); // Create value stats SimpleStatsEvolution.Result result = evolution.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java index 2a8818d4989f..7baf4ae61786 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java @@ -35,12 +35,18 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -54,6 +60,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.apache.paimon.SnapshotTest.newSnapshotManager; import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; @@ -223,6 +230,92 @@ public void testReadFilesFromNotExistSnapshot() { .satisfies(anyCauseMatches(IllegalArgumentException.class)); } + @Test + public void testReadStatsWithDataEvolutionWriteCols() throws Exception { + String tableName = "DataEvolutionFilesTable"; + Identifier identifier = identifier(tableName); + Schema schema = + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.STRING()) + .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true") + .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true") + .build(); + catalog.createTable(identifier, schema, true); + + // Write a data-evolution table. + FileStoreTable dataEvolutionTable = getTable(identifier); + BatchWriteBuilder writeBuilder = dataEvolutionTable.newBatchWriteBuilder(); + try (BatchTableWrite write = + writeBuilder + .newWrite() + .withWriteType( + schema.rowType().project(Collections.singletonList("f1"))); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(BinaryString.fromString("a"))); + commit.commit(write.prepareCommit()); + } + + catalog.alterTable(identifier, SchemaChange.addColumn("f2", DataTypes.INT()), false); + dataEvolutionTable = getTable(identifier); + writeBuilder = dataEvolutionTable.newBatchWriteBuilder(); + try (BatchTableWrite write = + writeBuilder + .newWrite() + .withWriteType( + dataEvolutionTable + .schema() + .logicalRowType() + .project(Collections.singletonList("f2"))); + BatchTableCommit commit = writeBuilder.newCommit()) { + write.write(GenericRow.of(1)); + List commitables = write.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + } + + FilesTable dataEvolutionFilesTable = + (FilesTable) + catalog.getTable( + identifier(tableName + SYSTEM_TABLE_SPLITTER + FilesTable.FILES)); + List result = read(dataEvolutionFilesTable); + + // Each file only contain partial data columns. + assertThat(result).hasSize(2); + assertThat( + result.stream() + .map(row -> row.getString(10).toString()) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("{f0=1, f1=0, f2=1}", "{f0=1, f1=1, f2=0}"); + assertThat( + result.stream() + .map(row -> row.getString(11).toString()) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", "{f0=null, f1=null, f2=1}"); + assertThat( + result.stream() + .map(row -> row.getString(12).toString()) + .collect(Collectors.toList())) + .containsExactlyInAnyOrder("{f0=null, f1=a, f2=null}", "{f0=null, f1=null, f2=1}"); + } + + private void setFirstRowId(List commitables, long firstRowId) { + commitables.forEach( + c -> { + CommitMessageImpl commitMessage = (CommitMessageImpl) c; + List newFiles = + new ArrayList<>(commitMessage.newFilesIncrement().newFiles()); + commitMessage.newFilesIncrement().newFiles().clear(); + commitMessage + .newFilesIncrement() + .newFiles() + .addAll( + newFiles.stream() + .map(s -> s.assignFirstRowId(firstRowId)) + .collect(Collectors.toList())); + }); + } + private List getExpectedResult(long snapshotId) { if (!snapshotManager.snapshotExists(snapshotId)) { return Collections.emptyList();