Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ class CelebornShuffleReader[K, C](
true
}
try {
// startPartition is irrelevant
fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition)
fileGroups = shuffleClient.updateFileGroup(shuffleId, startPartition, endPartition)
} catch {
case ce: CelebornIOException
if ce.getCause != null && ce.getCause.isInstanceOf[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public ShuffleClientImpl.ReduceFileGroups updateFileGroup(int shuffleId, int par
return null;
}

@Override
public ShuffleClientImpl.ReduceFileGroups updateFileGroup(
int shuffleId, int startPartition, int endPartition) throws CelebornIOException {
return null;
}

@Override
public boolean isShuffleStageEnd(int shuffleId) throws Exception {
return true;
Expand Down
31 changes: 31 additions & 0 deletions client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,37 @@ public abstract void mapPartitionMapperEnd(
public abstract ShuffleClientImpl.ReduceFileGroups updateFileGroup(int shuffleId, int partitionId)
throws CelebornIOException;

public ShuffleClientImpl.ReduceFileGroups updateFileGroup(
int shuffleId, int startPartition, int endPartition) throws CelebornIOException {
if (startPartition < 0 || endPartition < startPartition) {
throw new IllegalArgumentException(
String.format("Invalid reducer file group range [%d, %d)", startPartition, endPartition));
}

ShuffleClientImpl.ReduceFileGroups merged =
new ShuffleClientImpl.ReduceFileGroups(
new ConcurrentHashMap<>(),
null,
ConcurrentHashMap.newKeySet(),
new ConcurrentHashMap<>());
for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
ShuffleClientImpl.ReduceFileGroups current = updateFileGroup(shuffleId, partitionId);
if (current.partitionGroups != null) {
merged.partitionGroups.putAll(current.partitionGroups);
}
if (current.partitionIds != null) {
merged.partitionIds.addAll(current.partitionIds);
}
if (current.pushFailedBatches != null) {
merged.pushFailedBatches.putAll(current.pushFailedBatches);
}
if (merged.mapAttempts == null) {
merged.mapAttempts = current.mapAttempts;
}
}
return merged;
}

public abstract boolean isShuffleStageEnd(int shuffleId) throws Exception;

// Reduce side read partition which is deduplicated by mapperId+mapperAttemptNum+batchId, batchId
Expand Down
Loading
Loading