diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 0c7eadb7ae62..4b6b6b7ec04c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -35,6 +35,7 @@ import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.LeafPredicateExtractor; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -73,11 +74,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.Set; import java.util.TreeMap; import java.util.function.Function; import java.util.function.Supplier; @@ -280,12 +283,24 @@ public OptionalLong mergedRowCount() { private static class FilesRead implements InnerTableRead { + private static final Set SCAN_PUSHDOWN_FIELDS = scanPushdownFields(); + + private static Set scanPushdownFields() { + Set fields = new HashSet<>(); + fields.add("partition"); + fields.add("bucket"); + fields.add("level"); + return Collections.unmodifiableSet(fields); + } + private final SchemaManager schemaManager; private final FileStoreTable storeTable; private RowType readType; + @Nullable private Predicate predicate; + private FilesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { this.schemaManager = schemaManager; this.storeTable = fileStoreTable; @@ -293,7 +308,14 @@ private FilesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + if (predicate == null) { + this.predicate = null; + return this; + } + List remaining = + PredicateBuilder.excludePredicateWithFields( + PredicateBuilder.splitAnd(predicate), SCAN_PUSHDOWN_FIELDS); + this.predicate = remaining.isEmpty() ? null : PredicateBuilder.and(remaining); return this; } @@ -366,6 +388,11 @@ public RowDataToObjectArrayConverter apply(Long schemaId) { simpleStatsEvolutions))); } Iterator rows = Iterators.concat(iteratorList.iterator()); + + if (predicate != null) { + rows = Iterators.filter(rows, predicate::test); + } + if (readType != null) { rows = Iterators.transform( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java index a165eb5185c6..1291308e5ac8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java @@ -61,6 +61,8 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; +import javax.annotation.Nullable; + import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; @@ -182,13 +184,15 @@ private static class PartitionsRead implements InnerTableRead { private RowType readType; + @Nullable private Predicate predicate; + public PartitionsRead(FileStoreTable table) { this.fileStoreTable = table; } @Override public InnerTableRead withFilter(Predicate predicate) { - // TODO + this.predicate = predicate; return this; } @@ -236,6 +240,10 @@ public RecordReader createReader(Split split) throws IOException { .sorted(Comparator.comparing(row -> row.getString(0))) .iterator(); + if (predicate != null) { + iterator = Iterators.filter(iterator, predicate::test); + } + if (readType != null) { iterator = Iterators.transform( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java index 2a8818d4989f..e2ed0e8162ab 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FilesTableTest.java @@ -202,6 +202,43 @@ public void testReadWithNotFullPartitionKey() throws Exception { assertThat(readPartBucketLevel(builder.equal(0, "[2]"))).isEmpty(); } + @Test + public void testReadWithSchemaIdFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE); + + assertThat(readPartBucketLevel(builder.equal(4, 0L))) + .containsExactlyInAnyOrder( + "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 20}-0-0"); + assertThat(readPartBucketLevel(builder.equal(4, 999L))).isEmpty(); + } + + @Test + public void testReadWithRecordCountFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE); + + assertThat(readPartBucketLevel(builder.greaterThan(6, 0L))) + .containsExactlyInAnyOrder( + "{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 20}-0-0"); + assertThat(readPartBucketLevel(builder.greaterThan(6, 100L))).isEmpty(); + } + + @Test + public void testReadWithCombinedPushdownAndPostFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE); + + Predicate combined = + PredicateBuilder.and( + builder.equal(0, BinaryString.fromString("{1, 10}")), builder.equal(4, 0L)); + assertThat(readPartBucketLevel(combined)) + .containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0"); + + Predicate combinedMiss = + PredicateBuilder.and( + builder.equal(0, BinaryString.fromString("{1, 10}")), + builder.equal(4, 999L)); + assertThat(readPartBucketLevel(combinedMiss)).isEmpty(); + } + @Test public void testReadFilesFromSpecifiedSnapshot() throws Exception { List expectedRow = getExpectedResult(1L); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java index d70fe0d6662e..f79c7811b519 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java @@ -26,6 +26,9 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -33,12 +36,15 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.DataTypes; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -180,4 +186,64 @@ void testPartitionWithLegacyPartitionName() throws Exception { List result = read(testPartitionsTable, new int[] {0, 1}); assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow); } + + @Test + public void testReadWithPartitionEqualFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + assertThat(readPartitionAndRecordCount(builder.equal(0, BinaryString.fromString("pt=2")))) + .containsExactlyInAnyOrder("pt=2-1"); + + assertThat(readPartitionAndRecordCount(builder.equal(0, BinaryString.fromString("pt=99")))) + .isEmpty(); + } + + @Test + public void testReadWithPartitionInFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + assertThat( + readPartitionAndRecordCount( + builder.in( + 0, + Arrays.asList( + (Object) BinaryString.fromString("pt=1"), + BinaryString.fromString("pt=3"))))) + .containsExactlyInAnyOrder("pt=1-2", "pt=3-1"); + } + + @Test + public void testReadWithRecordCountFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + assertThat(readPartitionAndRecordCount(builder.greaterThan(1, 1L))) + .containsExactlyInAnyOrder("pt=1-2"); + } + + @Test + public void testReadWithFileCountFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE); + + assertThat(readPartitionAndRecordCount(builder.equal(3, 1L))) + .containsExactlyInAnyOrder("pt=2-1", "pt=3-1"); + assertThat(readPartitionAndRecordCount(builder.greaterOrEqual(3, 2L))) + .containsExactlyInAnyOrder("pt=1-2"); + } + + @Test + public void testReadWithNullFilterReturnsAll() throws Exception { + assertThat(readPartitionAndRecordCount(null)) + .containsExactlyInAnyOrder("pt=1-2", "pt=2-1", "pt=3-1"); + } + + private List readPartitionAndRecordCount(Predicate predicate) throws IOException { + ReadBuilder readBuilder = partitionsTable.newReadBuilder().withFilter(predicate); + List rows = new ArrayList<>(); + try (RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan())) { + reader.forEachRemaining( + row -> rows.add(row.getString(0).toString() + "-" + row.getLong(1))); + } + return rows; + } }