Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)
Comment thread
chenghuichen marked this conversation as resolved.
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
// Decommission shutdown overrides graceful shutdown: a decommissioned worker will not
// restart, so recovery state (recovery DB, sorter state) should not be persisted.
def effectiveWorkerGracefulShutdown: Boolean =
workerGracefulShutdown && !workerDecommissionShutdown
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
Expand Down Expand Up @@ -4477,6 +4482,23 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")

val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.decommission.shutdown.enabled")
Comment thread
chenghuichen marked this conversation as resolved.
.categories("worker")
.doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " +
"waiting for all shuffle data to be consumed or expired before exiting. " +
"This is suitable for permanent scale-down scenarios where the worker will not restart. " +
"When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " +
"(recovery state will not be saved since the worker is not expected to come back). " +
"Operators should set the pod's terminationGracePeriodSeconds to " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

terminationGracePeriodSeconds guidance understates the budget. This tells operators to set it to forceExitTimeout + checkInterval + small buffer, but stop() teardown runs inside that same forceExitTimeout + checkInterval hook timeout, and its cost is unbounded (DFS deletes of all unreleased shuffle). When the wait loop runs close to forceExitTimeout, only ~checkInterval is left for stop(), so a "small buffer" can't cover it and the pod is SIGKILLed mid-cleanup. The in-code comments acknowledge stop() needs headroom; the operator-facing doc should say so too (and the buffer should scale with the worst-case teardown, not be "small").

"celeborn.worker.decommission.forceExitTimeout + " +
"celeborn.worker.decommission.checkInterval plus a small buffer, to ensure " +
"the shutdown hook has enough time to complete resource cleanup before " +
"being killed.")
.version("0.7.0")
.booleanConf
.createWithDefault(false)

val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ license: |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval plus a small buffer, to ensure the shutdown hook has enough time to complete resource cleanup before being killed. | 0.7.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public PartitionFilesSorter(
long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight();
this.source = source;
this.cleaner = new PartitionFilesCleaner(this);
boolean gracefulShutdown = conf.workerGracefulShutdown();
boolean gracefulShutdown = conf.effectiveWorkerGracefulShutdown();
// Assume a chunk won't be larger than 2GB
// ShuffleClient can fetch shuffle data from a restarted worker only
// when the worker's fetching port is stable and enables graceful shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[celeborn] class Worker(

private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
private val gracefulShutdown = conf.effectiveWorkerGracefulShutdown
if (gracefulShutdown) {
var checkPortMap = Map(
WORKER_RPC_PORT -> conf.workerRpcPort,
Expand Down Expand Up @@ -619,29 +619,34 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")

// Both graceful shutdown and decommission have drained data, so in-flight
// tasks are allowed to finish instead of being force-cancelled.
val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drain intent isn't carried through to the partition sorter. This comment/drainBeforeExit declare that decommission "has drained data, so in-flight tasks are allowed to finish" — and that holds for the thread pools here. But stop() later calls partitionsSorter.close(WORKER_DECOMMISSION), and PartitionFilesSorter.close only takes the await-termination branch for exitKind == WORKER_GRACEFUL_SHUTDOWN; WORKER_DECOMMISSION falls into the else and calls fileSorterExecutors.shutdownNow() (PartitionFilesSorter.java:415-417), force-interrupting an in-flight on-demand sort → truncated sorted file/index → fetch failure for a reader mid-sort. Not a regression vs the old immediate path, but inconsistent with the new drain semantics — either group WORKER_DECOMMISSION with graceful in the sorter, or soften this comment.

exitKind == CelebornExitKind.WORKER_DECOMMISSION

if (jvmProfiler != null) {
jvmProfiler.stop()
}
if (jvmQuake != null) {
jvmQuake.stop()
}
if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
sendHeartbeatTask.cancel(false)
} else {
sendHeartbeatTask.cancel(true)
}
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
checkFastFailTask.cancel(false)
} else {
checkFastFailTask.cancel(true)
}
checkFastFailTask = null
}
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
Expand Down Expand Up @@ -950,7 +955,7 @@ private[celeborn] class Worker(
exitType.toUpperCase(Locale.ROOT) match {
case "DECOMMISSION" =>
ShutdownHookManager.get().updateTimeout(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated hook-timeout expression. conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval appears here (REST exit handler) and again in the construction block (~line 1107). Extract a single decommissionHookTimeoutMs helper so the SIGTERM and REST decommission paths can't drift to different timeouts.

conf.workerDecommissionForceExitTimeout,
conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval,
TimeUnit.MILLISECONDS)
workerStatusManager.doTransition(WorkerEventType.Decommission)
case "GRACEFUL" =>
Expand Down Expand Up @@ -1031,7 +1036,9 @@ private[celeborn] class Worker(

def waitTime: Long = waitTimes * interval

while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) {
// Bound the total wait strictly by the timeout so that the remaining shutdown hook
// budget is left for stop(WORKER_DECOMMISSION) to clean up resources.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worst case can truncate stop() mid-cleanup. This comment claims the wait loop leaves budget for stop(), but the loop below can consume the entire forceExitTimeout (floor(6h/30s)·30s = 6h with defaults), leaving only ~checkInterval (30s) of the forceExitTimeout + checkInterval hook budget for stop(WORKER_DECOMMISSION). That path runs StorageManager.cleanupExpiredShuffleKey(shuffleKeySet(), false) — unbounded synchronous DFS delete() of all unreleased shuffle (exactly the slow case, since shuffle didn't drain) — plus flusher + 3 netty server shutdowns. If it exceeds 30s, ShutdownHookManager.executeShutdown does future.cancel(true), interrupting teardown → orphaned DFS objects and transitionState(Exit) skipped. The sendWorkerDecommissionToMaster() askSync before the loop is also unbudgeted (retries on a slow master). Consider bounding the wait to leave a teardown headroom proportional to the real stop() cost, not a fixed checkInterval.

while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval <= timeout) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forceExitTimeout < checkInterval → no wait at all → data loss. With waitTimes=0 the guard is 0 + interval <= timeout, i.e. checkInterval <= forceExitTimeout. If an operator sets forceExitTimeout below checkInterval, the loop body never executes — decommissionWorker() returns immediately and stop() deletes shuffle data consumers still need → FetchFailure / SHUFFLE_DATA_LOST. The previous waitTime < timeout always waited at least once. There's no checkValue guarding forceExitTimeout >= checkInterval; consider adding one, or special-casing interval > timeout to still wait (bounded by timeout).

Thread.sleep(interval)
waitTimes += 1
}
Expand Down Expand Up @@ -1080,22 +1087,28 @@ private[celeborn] class Worker(
workerStatusManager.exitEventType match {
case WorkerEventType.Graceful =>
shutdownGracefully()
stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
case WorkerEventType.Decommission =>
decommissionWorker()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decommission report failure has no retry/fallback on the SIGTERM path. decommissionWorker() calls sendWorkerDecommissionToMaster(), which swallows any exception and proceeds. If the master is briefly unreachable when SIGTERM fires, no decommission record reaches it and it must wait for heartbeat-timeout eviction. The other two exit paths are more robust — exitImmediately() uses WorkerLost and shutdownGracefully() uses ReportWorkerUnavailable. Consider a single retry or a ReportWorkerUnavailable fallback to match them.

stop(CelebornExitKind.WORKER_DECOMMISSION)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior change for runtime REST decommission. This branch is reached not only by the new SIGTERM feature but also by the existing exit("DECOMMISSION") REST API (via the exit-thread → System.exit → hook). Pre-PR that path ran stop(EXIT_IMMEDIATELY); now it runs stop(WORKER_DECOMMISSION), which via drainBeforeExit makes the heartbeat/fast-fail tasks cancel(false) and the thread pools shutdown() (drain) instead of cancel(true)/shutdownNow(). So every cluster using the REST decommission API gets changed teardown semantics even without enabling celeborn.worker.decommission.shutdown.enabled. Worth calling out explicitly as intended.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a regression — the existing REST exit("DECOMMISSION") path already swallows this, and the master falls back to heartbeat-timeout eviction.

case _ =>
exitImmediately()
}

if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
} else {
stop(CelebornExitKind.EXIT_IMMEDIATELY)
stop(CelebornExitKind.EXIT_IMMEDIATELY)
}
}
},
"worker-shutdown-hook-thread"),
WORKER_SHUTDOWN_PRIORITY)

if (conf.workerDecommissionShutdown) {
Comment thread
chenghuichen marked this conversation as resolved.
// The wait loop in decommissionWorker() is bounded by forceExitTimeout, so the extra
// checkInterval reserves headroom for stop(WORKER_DECOMMISSION) to finish cleanup
// before the hook is cancelled.
ShutdownHookManager.get().updateTimeout(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Altitude: updateTimeout is process-wide. ShutdownHookManager.updateTimeout does hooks.forEach(setTimeout), so this raises the timeout of every registered hook (not just the worker hook) to forceExitTimeout + checkInterval (6h+ by default). An unrelated stuck hook could then hang JVM exit for hours where the default would force-terminate it. Prefer registering the worker hook with the 4-arg addShutdownHook(hook, priority, timeout, unit) so only that hook gets the extended budget — which also removes the register-then-update sequencing.

conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval,
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
TimeUnit.MILLISECONDS)
}

@VisibleForTesting
def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging
private var worker: Worker = _
private var shutdown: AtomicBoolean = _
private var storageManager: StorageManager = _
private val decommissionShutdown = conf.workerDecommissionShutdown
private val gracefulShutdown = conf.workerGracefulShutdown
if (gracefulShutdown) {
if (decommissionShutdown) {
exitEventType = WorkerEventType.Decommission

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitEventType is read across threads without synchronization. It's written here and in exit() under this.synchronized, but the shutdown-hook thread reads workerStatusManager.exitEventType (Worker.scala:1087) without the lock, and the field (declared line 38) isn't @volatile. A runtime REST exit(...) that updates it shortly before an independent SIGTERM-triggered hook read has no happens-before guarantee → the hook could take a stale branch (e.g. exitImmediately instead of decommissionWorker). Mark exitEventType @volatile.

logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" +
" (overrides graceful shutdown)")
} else if (gracefulShutdown) {
exitEventType = WorkerEventType.Graceful
}
Comment thread
chenghuichen marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]()
// ShuffleClient can fetch data from a restarted worker only
// when the worker's fetching port is stable.
val workerGracefulShutdown = conf.workerGracefulShutdown
val workerGracefulShutdown = conf.effectiveWorkerGracefulShutdown
if (workerGracefulShutdown) {
try {
val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,55 @@ class WorkerStatusManagerSuite extends AnyFunSuite {
statusManager.init(worker)

statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())
Assert.assertEquals(
worker.workerInfo.getWorkerStatus().getStateValue,
PbWorkerStatus.State.InDecommissionThenIdle.getNumber)
PbWorkerStatus.State.InDecommissionThenIdle.getNumber,
worker.workerInfo.getWorkerStatus().getStateValue)

// Rerun state Transition
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())

// Reset shuffleKeys
shuffleKeys.clear()
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Idle)
Assert.assertEquals(PbWorkerStatus.State.Idle, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())
}

test("Test exitEventType initialization based on config") {
// Neither graceful nor decommission → Immediately. Set both keys explicitly so the
// assertion does not depend on system properties leaked from other tests.
val conf1 = new CelebornConf()
Comment thread
chenghuichen marked this conversation as resolved.
conf1.set("celeborn.worker.graceful.shutdown.enabled", "false")
conf1.set("celeborn.worker.decommission.shutdown.enabled", "false")
val mgr1 = new WorkerStatusManager(conf1)
Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType)

// Graceful shutdown only → Graceful
val conf2 = new CelebornConf()
conf2.set("celeborn.worker.graceful.shutdown.enabled", "true")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-defensive test setup. conf1 above explicitly sets both keys with the comment "so the assertion does not depend on system properties leaked from other tests." But conf2 here sets only graceful.shutdown.enabled=true (not decommission.shutdown.enabled=false). Since new CelebornConf() loads system properties, a leaked celeborn.worker.decommission.shutdown.enabled=true would make exitEventType=Decommission and this assertEquals(Graceful, mgr2.exitEventType) fail. Apply conf1's both-keys defensiveness to conf2/conf3.

val mgr2 = new WorkerStatusManager(conf2)
Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType)

Comment thread
chenghuichen marked this conversation as resolved.
// Decommission shutdown only → Decommission
val conf3 = new CelebornConf()
conf3.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr3 = new WorkerStatusManager(conf3)
Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType)

// Both enabled → Decommission overrides graceful
val conf4 = new CelebornConf()
conf4.set("celeborn.worker.graceful.shutdown.enabled", "true")
conf4.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr4 = new WorkerStatusManager(conf4)
Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType)
Comment thread
chenghuichen marked this conversation as resolved.
Assert.assertTrue(conf4.workerGracefulShutdown)
Assert.assertTrue(conf4.workerDecommissionShutdown)
}
}
Loading