Skip to content

[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726

Open
s0nskar wants to merge 3 commits into
apache:mainfrom
s0nskar:fix_fetch_failures
Open

[CELEBORN-2166] Mark shuffle data lost and fast fail if allocated worker is lost#3726
s0nskar wants to merge 3 commits into
apache:mainfrom
s0nskar:fix_fetch_failures

Conversation

@s0nskar

@s0nskar s0nskar commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Enhancing the logic of #3496

Why are the changes needed?

#3496 only handles the reduce side flow i.e GetReducerFileGroup request will fail if the shuffle data is mark lost.

In this PR, we are making use of the WorkerStatusListener to immediately detect the lost workers, mark the data lost of the stage and immediately issue stage end for that stage. This will also allow write stage to fast-fail during revives and commit request, otherwise write stage will run as usual and then reduce will fail at startup. This will lead to lot of resource and time wastage.

Does this PR resolve a correctness bug?

  • Yes

Does this PR introduce any user-facing change?

  • Yes

How was this patch tested?

  • Added UTs
  • Working on staging testing.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s0nskar, the mechanism and the new tests are sound, and the setStageEnd/handleGetReducerFileGroup lock handshake stays correct with the new heartbeat-thread caller (no missed/double reply). The main concern is the combination of (a) flipping the default to true and (b) the marking being irreversible while WORKER_UNKNOWN can be transient. Details inline.

}
override def markShuffleDataLost(shuffleId: Int): Unit = {
logWarning(s"Marking shuffle $shuffleId data as lost due to unknown/crashed worker.")
dataLostShuffleSet.add(shuffleId)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Irreversible marking vs. transient WORKER_UNKNOWN. markShuffleDataLost does dataLostShuffleSet.add + setStageEnd, and neither is undone for the life of the shuffle (only removeExpiredShuffle clears it). But WORKER_UNKNOWN is transient: the master computes unknownWorkers = needCheckedWorkerList.filterNot(workersMap.containsKey) (Master.scala:1241), so a still-alive worker that briefly leaves workersMap — master failover/restart rebuilding state, a heartbeat-timeout eviction, or a long GC pause — is reported unknown for a heartbeat and then recovers on re-registration.

The removed isStageDataLostInUnknownWorker was evaluated live on every isStageDataLost call and reverted once the worker left excludedWorkers. With this change a transient blip permanently marks every affected shuffle SHUFFLE_DATA_LOST and force-recomputes the stage, even though the committed data is intact. Consider re-validating against current worker status before failing, or keeping the mark reversible while the worker is only unknown (not confirmed lost).

Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
override def notifyChangedWorkersStatus(workersStatus: WorkersStatus): Unit = {
if (shuffleDataLostOnUnknownWorkerEnabled && !pushReplicateEnabled) {
if (workersStatus.unknownWorkers != null && !workersStatus.unknownWorkers.isEmpty) {
lifecycleManager.shuffleAllocatedWorkers.asScala.foreach {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach has no per-shuffle guard. markShuffleDataLostgetCommitHandler(shuffleId)lifecycleManager.getPartitionType(shuffleId); if that is transiently null for a shuffle being torn down (or returns an unexpected type → the case _ => throw UnsupportedOperationException in getCommitHandler), the exception propagates out of notifyChangedWorkersStatus and is only caught at WorkerStatusTracker.scala:207, abandoning the rest of the loop so other affected shuffles aren't marked this heartbeat. Since marking is per-shuffle, wrap each iteration in its own try/catch (log and continue).

Comment thread common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Comment thread docs/migration.md
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants