Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.LeafPredicateExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -73,11 +74,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -280,20 +283,39 @@ public OptionalLong mergedRowCount() {

private static class FilesRead implements InnerTableRead {

private static final Set<String> SCAN_PUSHDOWN_FIELDS = scanPushdownFields();

private static Set<String> scanPushdownFields() {
Set<String> fields = new HashSet<>();
fields.add("partition");
fields.add("bucket");
fields.add("level");
return Collections.unmodifiableSet(fields);
}

private final SchemaManager schemaManager;

private final FileStoreTable storeTable;

private RowType readType;

@Nullable private Predicate predicate;

private FilesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) {
this.schemaManager = schemaManager;
this.storeTable = fileStoreTable;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
if (predicate == null) {
this.predicate = null;
return this;
}
List<Predicate> remaining =
PredicateBuilder.excludePredicateWithFields(
PredicateBuilder.splitAnd(predicate), SCAN_PUSHDOWN_FIELDS);
this.predicate = remaining.isEmpty() ? null : PredicateBuilder.and(remaining);
return this;
}

Expand Down Expand Up @@ -366,6 +388,11 @@ public RowDataToObjectArrayConverter apply(Long schemaId) {
simpleStatsEvolutions)));
}
Iterator<InternalRow> rows = Iterators.concat(iteratorList.iterator());

if (predicate != null) {
rows = Iterators.filter(rows, predicate::test);
}

if (readType != null) {
rows =
Iterators.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -182,13 +184,15 @@ private static class PartitionsRead implements InnerTableRead {

private RowType readType;

@Nullable private Predicate predicate;

public PartitionsRead(FileStoreTable table) {
this.fileStoreTable = table;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
// TODO
this.predicate = predicate;
return this;
}

Expand Down Expand Up @@ -236,6 +240,10 @@ public RecordReader<InternalRow> createReader(Split split) throws IOException {
.sorted(Comparator.comparing(row -> row.getString(0)))
.iterator();

if (predicate != null) {
iterator = Iterators.filter(iterator, predicate::test);
}

if (readType != null) {
iterator =
Iterators.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,43 @@ public void testReadWithNotFullPartitionKey() throws Exception {
assertThat(readPartBucketLevel(builder.equal(0, "[2]"))).isEmpty();
}

@Test
public void testReadWithSchemaIdFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);

assertThat(readPartBucketLevel(builder.equal(4, 0L)))
.containsExactlyInAnyOrder(
"{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 20}-0-0");
assertThat(readPartBucketLevel(builder.equal(4, 999L))).isEmpty();
}

@Test
public void testReadWithRecordCountFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);

assertThat(readPartBucketLevel(builder.greaterThan(6, 0L)))
.containsExactlyInAnyOrder(
"{1, 10}-0-0", "{1, 10}-0-0", "{2, 20}-0-0", "{2, 20}-0-0");
assertThat(readPartBucketLevel(builder.greaterThan(6, 100L))).isEmpty();
}

@Test
public void testReadWithCombinedPushdownAndPostFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);

Predicate combined =
PredicateBuilder.and(
builder.equal(0, BinaryString.fromString("{1, 10}")), builder.equal(4, 0L));
assertThat(readPartBucketLevel(combined))
.containsExactlyInAnyOrder("{1, 10}-0-0", "{1, 10}-0-0");

Predicate combinedMiss =
PredicateBuilder.and(
builder.equal(0, BinaryString.fromString("{1, 10}")),
builder.equal(4, 999L));
assertThat(readPartBucketLevel(combinedMiss)).isEmpty();
}

@Test
public void testReadFilesFromSpecifiedSnapshot() throws Exception {
List<InternalRow> expectedRow = getExpectedResult(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -180,4 +186,64 @@ void testPartitionWithLegacyPartitionName() throws Exception {
List<InternalRow> result = read(testPartitionsTable, new int[] {0, 1});
assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
}

@Test
public void testReadWithPartitionEqualFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE);

assertThat(readPartitionAndRecordCount(builder.equal(0, BinaryString.fromString("pt=2"))))
.containsExactlyInAnyOrder("pt=2-1");

assertThat(readPartitionAndRecordCount(builder.equal(0, BinaryString.fromString("pt=99"))))
.isEmpty();
}

@Test
public void testReadWithPartitionInFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE);

assertThat(
readPartitionAndRecordCount(
builder.in(
0,
Arrays.asList(
(Object) BinaryString.fromString("pt=1"),
BinaryString.fromString("pt=3")))))
.containsExactlyInAnyOrder("pt=1-2", "pt=3-1");
}

@Test
public void testReadWithRecordCountFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE);

assertThat(readPartitionAndRecordCount(builder.greaterThan(1, 1L)))
.containsExactlyInAnyOrder("pt=1-2");
}

@Test
public void testReadWithFileCountFilter() throws Exception {
PredicateBuilder builder = new PredicateBuilder(PartitionsTable.TABLE_TYPE);

assertThat(readPartitionAndRecordCount(builder.equal(3, 1L)))
.containsExactlyInAnyOrder("pt=2-1", "pt=3-1");
assertThat(readPartitionAndRecordCount(builder.greaterOrEqual(3, 2L)))
.containsExactlyInAnyOrder("pt=1-2");
}

@Test
public void testReadWithNullFilterReturnsAll() throws Exception {
assertThat(readPartitionAndRecordCount(null))
.containsExactlyInAnyOrder("pt=1-2", "pt=2-1", "pt=3-1");
}

private List<String> readPartitionAndRecordCount(Predicate predicate) throws IOException {
ReadBuilder readBuilder = partitionsTable.newReadBuilder().withFilter(predicate);
List<String> rows = new ArrayList<>();
try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan())) {
reader.forEachRemaining(
row -> rows.add(row.getString(0).toString() + "-" + row.getLong(1)));
}
return rows;
}
}