diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 102100b9fb1d..73f335fc0236 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -189,7 +189,7 @@ public FileStoreCommitImpl( this.manifestList = manifestListFactory.create(); this.indexManifestFile = indexManifestFileFactory.create(); this.rollback = rollback; - this.scanner = new CommitScanner(scanSupplier.get(), indexManifestFile, options); + this.scanner = new CommitScanner(scanSupplier, indexManifestFile, options); this.commitPreCallbacks = commitPreCallbacks; this.commitCallbacks = commitCallbacks; this.retryWaiter = @@ -735,6 +735,36 @@ private int tryCommit( return retryCount + 1; } + private void checkSameBucketFromSnapshot( + List deltaFiles, @Nullable Snapshot latestSnapshot) { + if (latestSnapshot == null) { + return; + } + + Map expectedTotalBuckets = + conflictDetection.collectUncheckedBucketPartitions(deltaFiles); + if (expectedTotalBuckets.isEmpty()) { + return; + } + + Map previousTotalBuckets = + scanner.readTotalBuckets( + latestSnapshot, new ArrayList<>(expectedTotalBuckets.keySet())); + Optional exception = + conflictDetection.checkSameBucketByTotalBuckets( + expectedTotalBuckets, previousTotalBuckets); + if (exception.isPresent()) { + throw exception.get(); + } + } + + private boolean shouldCheckSameBucket(CommitKind commitKind) { + return commitKind == CommitKind.APPEND + && bucketMode == BucketMode.HASH_FIXED + && options.writeOnly() + && !options.bucketAppendOrdered(); + } + /** * Try to overwrite partition. * @@ -834,7 +864,13 @@ CommitResult tryCommitOnce( List baseDataFiles = new ArrayList<>(); boolean discardDuplicate = options.commitDiscardDuplicateFiles() && commitKind == CommitKind.APPEND; - if (latestSnapshot != null && (discardDuplicate || detectConflicts)) { + boolean checkConflicts = latestSnapshot != null && (discardDuplicate || detectConflicts); + // By default, if checkConflicts is required, we do not have to do the extra check bucket + // here. + if (!checkConflicts && shouldCheckSameBucket(commitKind)) { + checkSameBucketFromSnapshot(deltaFiles, latestSnapshot); + } + if (checkConflicts) { // latestSnapshotId is different from the snapshot id we've checked for conflicts, // so we have to check again if (changedPartitions == null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java index 9afe4500a43f..e43f27ed8970 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java @@ -34,7 +34,13 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; import static java.util.Collections.emptyList; @@ -42,14 +48,20 @@ public class CommitScanner { private final FileStoreScan scan; + private final Supplier scanSupplier; private final IndexManifestFile indexManifestFile; + private final boolean dropStats; public CommitScanner( - FileStoreScan scan, IndexManifestFile indexManifestFile, CoreOptions options) { - this.scan = scan; + Supplier scanSupplier, + IndexManifestFile indexManifestFile, + CoreOptions options) { + this.scanSupplier = scanSupplier; + this.scan = scanSupplier.get(); this.indexManifestFile = indexManifestFile; // Stats in DELETE Manifest Entries is useless - if (options.manifestDeleteFileDropStats()) { + this.dropStats = options.manifestDeleteFileDropStats(); + if (dropStats) { this.scan.dropStats(); } } @@ -89,6 +101,34 @@ public List readAllEntriesFromChangedPartitions( } } + public Map readTotalBuckets( + Snapshot snapshot, List changedPartitions) { + try { + Set remainingPartitions = new HashSet<>(changedPartitions); + Map totalBuckets = new HashMap<>(); + FileStoreScan freshScan = scanSupplier.get(); + if (dropStats) { + freshScan.dropStats(); + } + Iterator iterator = + freshScan + .withSnapshot(snapshot) + .withKind(ScanMode.ALL) + .withPartitionFilter(changedPartitions) + .readFileIterator(); + while (iterator.hasNext() && !remainingPartitions.isEmpty()) { + ManifestEntry entry = iterator.next(); + int totalBucket = entry.totalBuckets(); + if (totalBucket > 0 && remainingPartitions.remove(entry.partition())) { + totalBuckets.put(entry.partition(), totalBucket); + } + } + return totalBuckets; + } catch (Throwable e) { + throw new RuntimeException("Cannot read total buckets from changed partitions.", e); + } + } + public CommitChanges readOverwriteChanges( int numBucket, List changes, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index 5b3f76697c4c..46d5c436192d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -71,6 +71,7 @@ public class ConflictDetection { private static final Logger LOG = LoggerFactory.getLogger(ConflictDetection.class); + private static final int SAME_BUCKET_CHECK_CACHE_MAX_SIZE = 1000; private final String tableName; private final String commitUser; @@ -84,6 +85,13 @@ public class ConflictDetection { private final IndexFileHandler indexFileHandler; private final SnapshotManager snapshotManager; private final CommitScanner commitScanner; + private final Map sameBucketCheckedPartitions = + new LinkedHashMap(SAME_BUCKET_CHECK_CACHE_MAX_SIZE, 0.75f, false) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > SAME_BUCKET_CHECK_CACHE_MAX_SIZE; + } + }; private @Nullable PartitionExpire partitionExpire; private @Nullable Long rowIdCheckFromSnapshot = null; @@ -223,6 +231,39 @@ public Optional checkConflicts( return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries); } + public Map collectUncheckedBucketPartitions( + List deltaEntries) { + Map totalBuckets = new HashMap<>(); + for (T entry : deltaEntries) { + if (entry.kind() != FileKind.ADD + || entry.totalBuckets() <= 0 + || sameBucketCheckedPartitions.containsKey(entry.partition())) { + continue; + } + + Integer previous = totalBuckets.putIfAbsent(entry.partition(), entry.totalBuckets()); + if (previous != null && previous != entry.totalBuckets()) { + throwBucketNumMismatch(entry.partition(), entry.totalBuckets(), previous); + } + } + return totalBuckets; + } + + public Optional checkSameBucketByTotalBuckets( + Map expectedTotalBuckets, + Map previousTotalBuckets) { + for (Map.Entry entry : expectedTotalBuckets.entrySet()) { + Integer previous = previousTotalBuckets.get(entry.getKey()); + if (previous != null && !Objects.equals(previous, entry.getValue())) { + return Optional.of(bucketNumMismatch(entry.getKey(), entry.getValue(), previous)); + } + } + for (BinaryRow partition : expectedTotalBuckets.keySet()) { + sameBucketCheckedPartitions.put(partition, Boolean.TRUE); + } + return Optional.empty(); + } + private Optional checkBucketKeepSame( List baseEntries, List deltaEntries, @@ -239,6 +280,9 @@ private Optional checkBucketKeepSame( if (entry.totalBuckets() <= 0) { continue; } + if (sameBucketCheckedPartitions.containsKey(entry.partition())) { + continue; + } if (!totalBuckets.containsKey(entry.partition())) { totalBuckets.put(entry.partition(), entry.totalBuckets()); @@ -266,9 +310,30 @@ private Optional checkBucketKeepSame( LOG.warn("", conflictException.getLeft()); return Optional.of(conflictException.getRight()); } + for (BinaryRow partition : totalBuckets.keySet()) { + sameBucketCheckedPartitions.put(partition, Boolean.TRUE); + } return Optional.empty(); } + private void throwBucketNumMismatch( + BinaryRow partition, int numBuckets, int previousNumBuckets) { + throw bucketNumMismatch(partition, numBuckets, previousNumBuckets); + } + + private RuntimeException bucketNumMismatch( + BinaryRow partition, int numBuckets, int previousNumBuckets) { + String partInfo = + partitionType.getFieldCount() > 0 + ? "partition {" + pathFactory.getPartitionString(partition) + "}" + : "table"; + return new RuntimeException( + String.format( + "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " + + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", + partInfo, numBuckets, previousNumBuckets)); + } + private Optional checkKeyRange( List baseEntries, List deltaEntries, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java index af1becc4652d..556209b2d318 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BucketedAppendFileStoreWriteTest.java @@ -45,11 +45,15 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import static org.apache.paimon.CoreOptions.BUCKET; +import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED; import static org.apache.paimon.CoreOptions.WRITE_MAX_WRITERS_TO_SPILL; +import static org.apache.paimon.CoreOptions.WRITE_ONLY; /** Tests for {@link BucketedAppendFileStoreWrite}. */ public class BucketedAppendFileStoreWriteTest { @@ -172,6 +176,43 @@ protected FileStoreTable createFileStoreTable() throws Exception { return (FileStoreTable) catalog.getTable(identifier); } + @Test + public void testIgnorePreviousFilesChecksPartitionBucketNumber() throws Exception { + FileStoreTable table = createFileStoreTable().copy(bucketOptions(2, false, false)); + BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) table.store().newWrite("ss"); + StreamTableCommit commit = table.newStreamWriteBuilder().newCommit(); + + write.write(partition(1), 1, GenericRow.of(1, 1, 0)); + commit.commit(0, write.prepareCommit(false, 0)); + + FileStoreTable rescaledTable = table.copy(bucketOptions(4, false, true)); + write = (BaseAppendFileStoreWrite) rescaledTable.store().newWrite("ss"); + write.write(partition(1), 1, GenericRow.of(1, 1, 0)); + List commitMessages = write.prepareCommit(false, 1); + Assertions.assertThat(commitMessages).isNotEmpty(); + Assertions.assertThatThrownBy( + () -> + rescaledTable + .newStreamWriteBuilder() + .newCommit() + .commit(1, commitMessages)) + .hasMessageContaining("new bucket num 4") + .hasMessageContaining("previous bucket num is 2"); + + write = (BaseAppendFileStoreWrite) rescaledTable.store().newWrite("ss"); + write.write(partition(2), 2, GenericRow.of(2, 2, 0)); + rescaledTable.newStreamWriteBuilder().newCommit().commit(2, write.prepareCommit(false, 2)); + } + + private Map bucketOptions( + int bucket, boolean bucketAppendOrdered, boolean writeOnly) { + Map options = new HashMap<>(); + options.put(BUCKET.key(), String.valueOf(bucket)); + options.put(BUCKET_APPEND_ORDERED.key(), String.valueOf(bucketAppendOrdered)); + options.put(WRITE_ONLY.key(), String.valueOf(writeOnly)); + return options; + } + private BinaryRow partition(int i) { BinaryRow binaryRow = new BinaryRow(1); BinaryRowWriter writer = new BinaryRowWriter(binaryRow); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java index 9c1796fcf4dc..f14c9854df3b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java @@ -102,6 +102,28 @@ public void testReadWrite() { assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB")); } + @Test + public void testInsertIntoCheckSameBucketAndInsertOverwriteRescale() { + batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')"); + batchSql( + "ALTER TABLE append_table SET (" + + "'bucket' = '2', " + + "'bucket-append-ordered' = 'false', " + + "'write-only' = 'true')"); + + assertThatThrownBy(() -> batchSql("INSERT INTO append_table VALUES (3, 'CCC')")) + .rootCause() + .isInstanceOf(RuntimeException.class) + .hasMessage( + "Try to write table with a new bucket num 2, but the previous bucket num is 1. " + + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first."); + + batchSql("INSERT OVERWRITE append_table VALUES (3, 'CCC'), (4, 'DDD')"); + + assertThat(batchSql("SELECT * FROM append_table")) + .containsExactlyInAnyOrder(Row.of(3, "CCC"), Row.of(4, "DDD")); + } + @Test public void testReadWriteWithExternalPathRoundRobinStrategy1() { String externalPaths =