Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)
Comment thread
chenghuichen marked this conversation as resolved.
def workerGracefulShutdown: Boolean =
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) && !workerDecommissionShutdown
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
Expand Down Expand Up @@ -4477,6 +4479,20 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")

val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.decommission.shutdown.enabled")
Comment thread
chenghuichen marked this conversation as resolved.
.categories("worker")
.doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " +
"waiting for all shuffle data to be consumed or expired before exiting. " +
"This is suitable for permanent scale-down scenarios where the worker will not restart. " +
"When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " +
"(recovery state will not be saved since the worker is not expected to come back). " +
"Operators should align celeborn.worker.decommission.forceExitTimeout with the pod's " +
"terminationGracePeriodSeconds.")
.version("0.6.0")
.booleanConf
.createWithDefault(false)

val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,9 @@ private[celeborn] class Worker(
case WorkerEventType.Graceful =>
shutdownGracefully()
case WorkerEventType.Decommission =>
ShutdownHookManager.get().updateTimeout(
conf.workerDecommissionForceExitTimeout,
TimeUnit.MILLISECONDS)
decommissionWorker()
Comment thread
chenghuichen marked this conversation as resolved.
Outdated

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decommission report failure has no retry/fallback on the SIGTERM path. decommissionWorker() calls sendWorkerDecommissionToMaster(), which swallows any exception and proceeds. If the master is briefly unreachable when SIGTERM fires, no decommission record reaches it and it must wait for heartbeat-timeout eviction. The other two exit paths are more robust — exitImmediately() uses WorkerLost and shutdownGracefully() uses ReportWorkerUnavailable. Consider a single retry or a ReportWorkerUnavailable fallback to match them.

case _ =>
exitImmediately()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging
private var worker: Worker = _
private var shutdown: AtomicBoolean = _
private var storageManager: StorageManager = _
private val decommissionShutdown = conf.workerDecommissionShutdown
private val gracefulShutdown = conf.workerGracefulShutdown
if (gracefulShutdown) {
if (decommissionShutdown) {
exitEventType = WorkerEventType.Decommission

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitEventType is read across threads without synchronization. It's written here and in exit() under this.synchronized, but the shutdown-hook thread reads workerStatusManager.exitEventType (Worker.scala:1087) without the lock, and the field (declared line 38) isn't @volatile. A runtime REST exit(...) that updates it shortly before an independent SIGTERM-triggered hook read has no happens-before guarantee → the hook could take a stale branch (e.g. exitImmediately instead of decommissionWorker). Mark exitEventType @volatile.

logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" +
" (overrides graceful shutdown)")
} else if (gracefulShutdown) {
exitEventType = WorkerEventType.Graceful
}
Comment thread
chenghuichen marked this conversation as resolved.

Expand Down
Loading