Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public FileStoreCommitImpl(
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
this.rollback = rollback;
this.scanner = new CommitScanner(scanSupplier.get(), indexManifestFile, options);
this.scanner = new CommitScanner(scanSupplier, indexManifestFile, options);
this.commitPreCallbacks = commitPreCallbacks;
this.commitCallbacks = commitCallbacks;
this.retryWaiter =
Expand Down Expand Up @@ -521,6 +521,7 @@ public int overwritePartition(
CommitKind.COMPACT,
false,
true,
false,
null);
generatedSnapshot += 1;
}
Expand Down Expand Up @@ -738,6 +739,36 @@ private int tryCommit(
return retryCount + 1;
}

private void checkSameBucketFromSnapshot(
List<ManifestEntry> deltaFiles, @Nullable Snapshot latestSnapshot) {
if (latestSnapshot == null) {
return;
}

Map<BinaryRow, Integer> expectedTotalBuckets =
conflictDetection.collectUncheckedBucketPartitions(deltaFiles);
if (expectedTotalBuckets.isEmpty()) {
return;
}

Map<BinaryRow, Integer> previousTotalBuckets =
scanner.readTotalBuckets(
latestSnapshot, new ArrayList<>(expectedTotalBuckets.keySet()));
Optional<RuntimeException> exception =
conflictDetection.checkSameBucketByTotalBuckets(
expectedTotalBuckets, previousTotalBuckets);
if (exception.isPresent()) {
throw exception.get();
}
}

private boolean shouldCheckSameBucket(CommitKind commitKind) {
return commitKind == CommitKind.APPEND
&& bucketMode == BucketMode.HASH_FIXED
&& options.writeOnly()
&& !options.bucketAppendOrdered();
}

/**
* Try to overwrite partition.
*
Expand Down Expand Up @@ -835,7 +866,13 @@ CommitResult tryCommitOnce(
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
boolean discardDuplicate =
options.commitDiscardDuplicateFiles() && commitKind == CommitKind.APPEND;
if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
boolean checkConflicts = latestSnapshot != null && (discardDuplicate || detectConflicts);
// By default, if checkConflicts is required, we do not have to do the extra check bucket
// here.
if (!checkConflicts && shouldCheckSameBucket(commitKind)) {
checkSameBucketFromSnapshot(deltaFiles, latestSnapshot);
}
if (checkConflicts) {
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,34 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;

/** Manifest entries scanner for commit. */
public class CommitScanner {

private final FileStoreScan scan;
private final Supplier<FileStoreScan> scanSupplier;
private final IndexManifestFile indexManifestFile;
private final boolean dropStats;

public CommitScanner(
FileStoreScan scan, IndexManifestFile indexManifestFile, CoreOptions options) {
this.scan = scan;
Supplier<FileStoreScan> scanSupplier,
IndexManifestFile indexManifestFile,
CoreOptions options) {
this.scanSupplier = scanSupplier;
this.scan = scanSupplier.get();
this.indexManifestFile = indexManifestFile;
// Stats in DELETE Manifest Entries is useless
if (options.manifestDeleteFileDropStats()) {
this.dropStats = options.manifestDeleteFileDropStats();
if (dropStats) {
this.scan.dropStats();
}
}
Expand Down Expand Up @@ -89,6 +101,34 @@ public List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
}
}

public Map<BinaryRow, Integer> readTotalBuckets(
Snapshot snapshot, List<BinaryRow> changedPartitions) {
try {
Set<BinaryRow> remainingPartitions = new HashSet<>(changedPartitions);
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
FileStoreScan freshScan = scanSupplier.get();
if (dropStats) {
freshScan.dropStats();
}
Iterator<ManifestEntry> iterator =
freshScan
.withSnapshot(snapshot)
.withKind(ScanMode.ALL)
.withPartitionFilter(changedPartitions)
.readFileIterator();
while (iterator.hasNext() && !remainingPartitions.isEmpty()) {
ManifestEntry entry = iterator.next();
int totalBucket = entry.totalBuckets();
if (totalBucket > 0 && remainingPartitions.remove(entry.partition())) {
totalBuckets.put(entry.partition(), totalBucket);
}
}
return totalBuckets;
} catch (Throwable e) {
throw new RuntimeException("Cannot read total buckets from changed partitions.", e);
}
}

public CommitChanges readOverwriteChanges(
int numBucket,
List<ManifestEntry> changes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
public class ConflictDetection {

private static final Logger LOG = LoggerFactory.getLogger(ConflictDetection.class);
private static final int SAME_BUCKET_CHECK_CACHE_MAX_SIZE = 1000;

private final String tableName;
private final String commitUser;
Expand All @@ -84,6 +85,13 @@ public class ConflictDetection {
private final IndexFileHandler indexFileHandler;
private final SnapshotManager snapshotManager;
private final CommitScanner commitScanner;
private final Map<BinaryRow, Boolean> sameBucketCheckedPartitions =
new LinkedHashMap<BinaryRow, Boolean>(SAME_BUCKET_CHECK_CACHE_MAX_SIZE, 0.75f, false) {
@Override
protected boolean removeEldestEntry(Map.Entry<BinaryRow, Boolean> eldest) {
return size() > SAME_BUCKET_CHECK_CACHE_MAX_SIZE;
}
};

private @Nullable PartitionExpire partitionExpire;
private @Nullable Long rowIdCheckFromSnapshot = null;
Expand Down Expand Up @@ -223,6 +231,39 @@ public Optional<RuntimeException> checkConflicts(
return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries);
}

public <T extends FileEntry> Map<BinaryRow, Integer> collectUncheckedBucketPartitions(
List<T> deltaEntries) {
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
for (T entry : deltaEntries) {
if (entry.kind() != FileKind.ADD
|| entry.totalBuckets() <= 0
|| sameBucketCheckedPartitions.containsKey(entry.partition())) {
continue;
}

Integer previous = totalBuckets.putIfAbsent(entry.partition(), entry.totalBuckets());
if (previous != null && previous != entry.totalBuckets()) {
throwBucketNumMismatch(entry.partition(), entry.totalBuckets(), previous);
}
}
return totalBuckets;
}

public Optional<RuntimeException> checkSameBucketByTotalBuckets(
Map<BinaryRow, Integer> expectedTotalBuckets,
Map<BinaryRow, Integer> previousTotalBuckets) {
for (Map.Entry<BinaryRow, Integer> entry : expectedTotalBuckets.entrySet()) {
Integer previous = previousTotalBuckets.get(entry.getKey());
if (previous != null && !Objects.equals(previous, entry.getValue())) {
return Optional.of(bucketNumMismatch(entry.getKey(), entry.getValue(), previous));
}
}
for (BinaryRow partition : expectedTotalBuckets.keySet()) {
sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
}
return Optional.empty();
}

private Optional<RuntimeException> checkBucketKeepSame(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
Expand All @@ -239,6 +280,9 @@ private Optional<RuntimeException> checkBucketKeepSame(
if (entry.totalBuckets() <= 0) {
continue;
}
if (sameBucketCheckedPartitions.containsKey(entry.partition())) {
continue;
}

if (!totalBuckets.containsKey(entry.partition())) {
totalBuckets.put(entry.partition(), entry.totalBuckets());
Expand All @@ -250,25 +294,47 @@ private Optional<RuntimeException> checkBucketKeepSame(
continue;
}

Pair<RuntimeException, RuntimeException> conflictException =
createConflictException(
"Total buckets of partition "
+ entry.partition()
+ " changed from "
+ old
+ " to "
+ entry.totalBuckets()
+ " without overwrite. Give up committing.",
baseCommitUser,
baseEntries,
deltaEntries,
null);
LOG.warn("", conflictException.getLeft());
return Optional.of(conflictException.getRight());
RuntimeException exception =
totalBucketsChanged(entry.partition(), old, entry.totalBuckets());
LOG.warn("", exception);
return Optional.of(exception);
}
for (BinaryRow partition : totalBuckets.keySet()) {
sameBucketCheckedPartitions.put(partition, Boolean.TRUE);
}
return Optional.empty();
}

private void throwBucketNumMismatch(
BinaryRow partition, int numBuckets, int previousNumBuckets) {
throw bucketNumMismatch(partition, numBuckets, previousNumBuckets);
}

private RuntimeException totalBucketsChanged(
BinaryRow partition, int oldNumBuckets, int newNumBuckets) {
return new RuntimeException(
"Total buckets of partition "
+ partition
+ " changed from "
+ oldNumBuckets
+ " to "
+ newNumBuckets
+ " without overwrite. Give up committing.");
}

private RuntimeException bucketNumMismatch(
BinaryRow partition, int numBuckets, int previousNumBuckets) {
String partInfo =
partitionType.getFieldCount() > 0
? "partition {" + pathFactory.getPartitionString(partition) + "}"
: "table";
return new RuntimeException(
String.format(
"Try to write %s with a new bucket num %d, but the previous bucket num is %d. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.",
partInfo, numBuckets, previousNumBuckets));
}

private Optional<RuntimeException> checkKeyRange(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,15 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.BUCKET_APPEND_ORDERED;
import static org.apache.paimon.CoreOptions.WRITE_MAX_WRITERS_TO_SPILL;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;

/** Tests for {@link BucketedAppendFileStoreWrite}. */
public class BucketedAppendFileStoreWriteTest {
Expand Down Expand Up @@ -172,6 +176,43 @@ protected FileStoreTable createFileStoreTable() throws Exception {
return (FileStoreTable) catalog.getTable(identifier);
}

@Test
public void testIgnorePreviousFilesChecksPartitionBucketNumber() throws Exception {
FileStoreTable table = createFileStoreTable().copy(bucketOptions(2, false, false));
BaseAppendFileStoreWrite write = (BaseAppendFileStoreWrite) table.store().newWrite("ss");
StreamTableCommit commit = table.newStreamWriteBuilder().newCommit();

write.write(partition(1), 1, GenericRow.of(1, 1, 0));
commit.commit(0, write.prepareCommit(false, 0));

FileStoreTable rescaledTable = table.copy(bucketOptions(4, false, true));
write = (BaseAppendFileStoreWrite) rescaledTable.store().newWrite("ss");
write.write(partition(1), 1, GenericRow.of(1, 1, 0));
List<CommitMessage> commitMessages = write.prepareCommit(false, 1);
Assertions.assertThat(commitMessages).isNotEmpty();
Assertions.assertThatThrownBy(
() ->
rescaledTable
.newStreamWriteBuilder()
.newCommit()
.commit(1, commitMessages))
.hasMessageContaining("new bucket num 4")
.hasMessageContaining("previous bucket num is 2");

write = (BaseAppendFileStoreWrite) rescaledTable.store().newWrite("ss");
write.write(partition(2), 2, GenericRow.of(2, 2, 0));
rescaledTable.newStreamWriteBuilder().newCommit().commit(2, write.prepareCommit(false, 2));
}

private Map<String, String> bucketOptions(
int bucket, boolean bucketAppendOrdered, boolean writeOnly) {
Map<String, String> options = new HashMap<>();
options.put(BUCKET.key(), String.valueOf(bucket));
options.put(BUCKET_APPEND_ORDERED.key(), String.valueOf(bucketAppendOrdered));
options.put(WRITE_ONLY.key(), String.valueOf(writeOnly));
return options;
}

private BinaryRow partition(int i) {
BinaryRow binaryRow = new BinaryRow(1);
BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,28 @@ public void testReadWrite() {
assertThat(rows).containsExactlyInAnyOrder(Row.of("AAA"), Row.of("BBB"));
}

@Test
public void testInsertIntoCheckSameBucketAndInsertOverwriteRescale() {
batchSql("INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')");
batchSql(
"ALTER TABLE append_table SET ("
+ "'bucket' = '2', "
+ "'bucket-append-ordered' = 'false', "
+ "'write-only' = 'true')");

assertThatThrownBy(() -> batchSql("INSERT INTO append_table VALUES (3, 'CCC')"))
.rootCause()
.isInstanceOf(RuntimeException.class)
.hasMessage(
"Try to write table with a new bucket num 2, but the previous bucket num is 1. "
+ "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.");

batchSql("INSERT OVERWRITE append_table VALUES (3, 'CCC'), (4, 'DDD')");

assertThat(batchSql("SELECT * FROM append_table"))
.containsExactlyInAnyOrder(Row.of(3, "CCC"), Row.of(4, "DDD"));
}

@Test
public void testReadWriteWithExternalPathRoundRobinStrategy1() {
String externalPaths =
Expand Down
Loading