From 9bc9eb1a313286f48b5d8d8ff7bb2247a23046a3 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Fri, 22 May 2026 11:23:07 +0800 Subject: [PATCH 01/10] Support decommission shutdown for worker scale-down scenarios --- .../apache/celeborn/common/CelebornConf.scala | 18 +++++++++++++++++- .../service/deploy/worker/Worker.scala | 3 +++ .../deploy/worker/WorkerStatusManager.scala | 7 ++++++- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 40d06617fea..c66f85a7c83 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1357,7 +1357,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// // Graceful Shutdown & Recover // // ////////////////////////////////////////////////////// - def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) + def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) + def workerGracefulShutdown: Boolean = + get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) && !workerDecommissionShutdown def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) def workerGracefulShutdownCheckSlotsFinishedInterval: Long = get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL) @@ -4477,6 +4479,20 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("6h") + val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.decommission.shutdown.enabled") + .categories("worker") + .doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " + + "waiting for all shuffle data to be consumed or expired before exiting. " + + "This is suitable for permanent scale-down scenarios where the worker will not restart. " + + "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + + "(recovery state will not be saved since the worker is not expected to come back). " + + "Operators should align celeborn.worker.decommission.forceExitTimeout with the pod's " + + "terminationGracePeriodSeconds.") + .version("0.6.0") + .booleanConf + .createWithDefault(false) + val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.worker.graceful.shutdown.enabled") .categories("worker") diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 1c5bc201803..beb90c50f93 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -1081,6 +1081,9 @@ private[celeborn] class Worker( case WorkerEventType.Graceful => shutdownGracefully() case WorkerEventType.Decommission => + ShutdownHookManager.get().updateTimeout( + conf.workerDecommissionForceExitTimeout, + TimeUnit.MILLISECONDS) decommissionWorker() case _ => exitImmediately() diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index e31fcf576a2..994d5047e6c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -39,8 +39,13 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging private var worker: Worker = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ + private val decommissionShutdown = conf.workerDecommissionShutdown private val gracefulShutdown = conf.workerGracefulShutdown - if (gracefulShutdown) { + if (decommissionShutdown) { + exitEventType = WorkerEventType.Decommission + logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" + + " (overrides graceful shutdown)") + } else if (gracefulShutdown) { exitEventType = WorkerEventType.Graceful } From ad76e886dd5014ae16579bfe88eb2cc3b2513d03 Mon Sep 17 00:00:00 2001 From: yunkai Date: Mon, 25 May 2026 17:41:59 +0800 Subject: [PATCH 02/10] Support decommission shutdown for worker scale-down scenarios --- docs/configuration/worker.md | 1 + .../service/deploy/worker/Worker.scala | 13 +++++++--- .../worker/WorkerStatusManagerSuite.scala | 26 +++++++++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bb2cec89bc3..dc42b0670c7 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -78,6 +78,7 @@ license: | | celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | +| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should align celeborn.worker.decommission.forceExitTimeout with the pod's terminationGracePeriodSeconds. | 0.6.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | | celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | | | celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index beb90c50f93..bba05b9e79f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -1072,6 +1072,12 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } + private val shutdownHookTimeout = if (conf.workerDecommissionShutdown) { + conf.workerDecommissionForceExitTimeout + } else { + conf.workerGracefulShutdownTimeoutMs + } + ShutdownHookManager.get().addShutdownHook( ThreadUtils.newThread( new Runnable { @@ -1081,9 +1087,6 @@ private[celeborn] class Worker( case WorkerEventType.Graceful => shutdownGracefully() case WorkerEventType.Decommission => - ShutdownHookManager.get().updateTimeout( - conf.workerDecommissionForceExitTimeout, - TimeUnit.MILLISECONDS) decommissionWorker() case _ => exitImmediately() @@ -1097,7 +1100,9 @@ private[celeborn] class Worker( } }, "worker-shutdown-hook-thread"), - WORKER_SHUTDOWN_PRIORITY) + WORKER_SHUTDOWN_PRIORITY, + shutdownHookTimeout, + TimeUnit.MILLISECONDS) @VisibleForTesting def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index c787be6882d..adc526b1c15 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -77,4 +77,30 @@ class WorkerStatusManagerSuite extends AnyFunSuite { statusManager.doTransition(WorkerEventType.Recommission) Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal) } + + test("Test exitEventType initialization based on config") { + // Default: neither graceful nor decommission → Immediately + val conf1 = new CelebornConf() + val mgr1 = new WorkerStatusManager(conf1) + Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType) + + // Graceful shutdown only → Graceful + val conf2 = new CelebornConf() + conf2.set("celeborn.worker.graceful.shutdown.enabled", "true") + val mgr2 = new WorkerStatusManager(conf2) + Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType) + + // Decommission shutdown only → Decommission + val conf3 = new CelebornConf() + conf3.set("celeborn.worker.decommission.shutdown.enabled", "true") + val mgr3 = new WorkerStatusManager(conf3) + Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType) + + // Both enabled → Decommission overrides graceful + val conf4 = new CelebornConf() + conf4.set("celeborn.worker.graceful.shutdown.enabled", "true") + conf4.set("celeborn.worker.decommission.shutdown.enabled", "true") + val mgr4 = new WorkerStatusManager(conf4) + Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType) + } } From c8227df821d038adc1d18ae2d0fead21eff6af81 Mon Sep 17 00:00:00 2001 From: yunkai Date: Mon, 25 May 2026 19:09:37 +0800 Subject: [PATCH 03/10] Support decommission shutdown for worker scale-down scenarios --- .../celeborn/service/deploy/worker/Worker.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index bba05b9e79f..e48e14628a1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -1072,11 +1072,12 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - private val shutdownHookTimeout = if (conf.workerDecommissionShutdown) { - conf.workerDecommissionForceExitTimeout - } else { - conf.workerGracefulShutdownTimeoutMs - } + private val shutdownHookTimeout = + if (conf.workerDecommissionShutdown) { + conf.workerDecommissionForceExitTimeout + } else { + conf.workerGracefulShutdownTimeoutMs + } ShutdownHookManager.get().addShutdownHook( ThreadUtils.newThread( From 81d825b04bce7cf8df74c585fc6301ca16ca957c Mon Sep 17 00:00:00 2001 From: yunkai Date: Mon, 25 May 2026 20:06:36 +0800 Subject: [PATCH 04/10] Support decommission shutdown for worker scale-down scenarios --- .../celeborn/service/deploy/worker/Worker.scala | 13 ++++++++----- .../deploy/worker/WorkerStatusManagerSuite.scala | 14 +++++++------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index e48e14628a1..c23b97a1d94 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -1074,7 +1074,7 @@ private[celeborn] class Worker( private val shutdownHookTimeout = if (conf.workerDecommissionShutdown) { - conf.workerDecommissionForceExitTimeout + conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval } else { conf.workerGracefulShutdownTimeoutMs } @@ -1093,10 +1093,13 @@ private[celeborn] class Worker( exitImmediately() } - if (workerStatusManager.exitEventType == WorkerEventType.Graceful) { - stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) - } else { - stop(CelebornExitKind.EXIT_IMMEDIATELY) + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) + case WorkerEventType.Decommission => + stop(CelebornExitKind.WORKER_DECOMMISSION) + case _ => + stop(CelebornExitKind.EXIT_IMMEDIATELY) } } }, diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index adc526b1c15..1953ce1990b 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -57,25 +57,25 @@ class WorkerStatusManagerSuite extends AnyFunSuite { statusManager.init(worker) statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle) + Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState()) Assert.assertEquals( - worker.workerInfo.getWorkerStatus().getStateValue, - PbWorkerStatus.State.InDecommissionThenIdle.getNumber) + PbWorkerStatus.State.InDecommissionThenIdle.getNumber, + worker.workerInfo.getWorkerStatus().getStateValue) // Rerun state Transition statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle) + Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState()) // Reset shuffleKeys shuffleKeys.clear() statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Idle) + Assert.assertEquals(PbWorkerStatus.State.Idle, statusManager.getWorkerState()) statusManager.doTransition(WorkerEventType.Recommission) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal) + Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState()) statusManager.doTransition(WorkerEventType.Recommission) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal) + Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState()) } test("Test exitEventType initialization based on config") { From df73277f82ae290f7ce3c629b469d157efac4f64 Mon Sep 17 00:00:00 2001 From: yunkai Date: Mon, 25 May 2026 20:16:57 +0800 Subject: [PATCH 05/10] Support decommission shutdown for worker scale-down scenarios --- .../apache/celeborn/service/deploy/worker/Worker.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index c23b97a1d94..02486b00fec 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -626,7 +626,8 @@ private[celeborn] class Worker( jvmQuake.stop() } if (sendHeartbeatTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || + exitKind == CelebornExitKind.WORKER_DECOMMISSION) { sendHeartbeatTask.cancel(false) } else { sendHeartbeatTask.cancel(true) @@ -634,14 +635,16 @@ private[celeborn] class Worker( sendHeartbeatTask = null } if (checkFastFailTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || + exitKind == CelebornExitKind.WORKER_DECOMMISSION) { checkFastFailTask.cancel(false) } else { checkFastFailTask.cancel(true) } checkFastFailTask = null } - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || + exitKind == CelebornExitKind.WORKER_DECOMMISSION) { forwardMessageScheduler.shutdown() replicateThreadPool.shutdown() commitThreadPool.shutdown() From 4cd9d6b4726c8c98bbe05ad948a8603a30ae0961 Mon Sep 17 00:00:00 2001 From: yunkai Date: Mon, 25 May 2026 20:25:49 +0800 Subject: [PATCH 06/10] Support decommission shutdown for worker scale-down scenarios --- .../apache/celeborn/common/CelebornConf.scala | 6 +- .../worker/storage/PartitionFilesSorter.java | 2 +- .../service/deploy/worker/Worker.scala | 69 ++++++++++--------- .../deploy/worker/WorkerStatusManager.scala | 4 +- .../worker/storage/StorageManager.scala | 2 +- 5 files changed, 42 insertions(+), 41 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c66f85a7c83..fe811f78e00 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1357,9 +1357,9 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// // Graceful Shutdown & Recover // // ////////////////////////////////////////////////////// - def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) - def workerGracefulShutdown: Boolean = - get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) && !workerDecommissionShutdown + def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) + def workerGracefulShutdownEnabled: Boolean = + get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) && !workerDecommissionShutdownEnabled def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) def workerGracefulShutdownCheckSlotsFinishedInterval: Long = get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 5979a8434c2..3034f4507e9 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -113,7 +113,7 @@ public PartitionFilesSorter( long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight(); this.source = source; this.cleaner = new PartitionFilesCleaner(this); - boolean gracefulShutdown = conf.workerGracefulShutdown(); + boolean gracefulShutdown = conf.workerGracefulShutdownEnabled(); // Assume a chunk won't be larger than 2GB // ShuffleClient can fetch shuffle data from a restarted worker only // when the worker's fetching port is stable and enables graceful shutdown. diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 02486b00fec..4a07b9527dd 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -156,7 +156,7 @@ private[celeborn] class Worker( private val WORKER_SHUTDOWN_PRIORITY = 100 val shutdown = new AtomicBoolean(false) - private val gracefulShutdown = conf.workerGracefulShutdown + private val gracefulShutdown = conf.workerGracefulShutdownEnabled if (gracefulShutdown) { var checkPortMap = Map( WORKER_RPC_PORT -> conf.workerRpcPort, @@ -1075,41 +1075,42 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - private val shutdownHookTimeout = - if (conf.workerDecommissionShutdown) { - conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval - } else { - conf.workerGracefulShutdownTimeoutMs - } - - ShutdownHookManager.get().addShutdownHook( - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - logInfo("Shutdown hook called.") - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - shutdownGracefully() - case WorkerEventType.Decommission => - decommissionWorker() - case _ => - exitImmediately() - } + private val shutdownHookThread = ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + logInfo("Shutdown hook called.") + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + shutdownGracefully() + case WorkerEventType.Decommission => + decommissionWorker() + case _ => + exitImmediately() + } - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) - case WorkerEventType.Decommission => - stop(CelebornExitKind.WORKER_DECOMMISSION) - case _ => - stop(CelebornExitKind.EXIT_IMMEDIATELY) - } + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) + case WorkerEventType.Decommission => + stop(CelebornExitKind.WORKER_DECOMMISSION) + case _ => + stop(CelebornExitKind.EXIT_IMMEDIATELY) } - }, - "worker-shutdown-hook-thread"), - WORKER_SHUTDOWN_PRIORITY, - shutdownHookTimeout, - TimeUnit.MILLISECONDS) + } + }, + "worker-shutdown-hook-thread") + + if (conf.workerDecommissionShutdownEnabled) { + ShutdownHookManager.get().addShutdownHook( + shutdownHookThread, + WORKER_SHUTDOWN_PRIORITY, + conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, + TimeUnit.MILLISECONDS) + } else { + ShutdownHookManager.get().addShutdownHook( + shutdownHookThread, + WORKER_SHUTDOWN_PRIORITY) + } @VisibleForTesting def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index 994d5047e6c..8a46acd012a 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -39,8 +39,8 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging private var worker: Worker = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ - private val decommissionShutdown = conf.workerDecommissionShutdown - private val gracefulShutdown = conf.workerGracefulShutdown + private val decommissionShutdown = conf.workerDecommissionShutdownEnabled + private val gracefulShutdown = conf.workerGracefulShutdownEnabled if (decommissionShutdown) { exitEventType = WorkerEventType.Decommission logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" + diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 9a2d4a8a740..4a679c0d8c6 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]() // ShuffleClient can fetch data from a restarted worker only // when the worker's fetching port is stable. - val workerGracefulShutdown = conf.workerGracefulShutdown + val workerGracefulShutdown = conf.workerGracefulShutdownEnabled if (workerGracefulShutdown) { try { val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend) From d5ff48caebdb54db1cb2c7a44b4ae65c9099a78c Mon Sep 17 00:00:00 2001 From: yunkai Date: Tue, 26 May 2026 15:11:04 +0800 Subject: [PATCH 07/10] Support decommission shutdown for worker scale-down scenarios --- .../scala/org/apache/celeborn/common/CelebornConf.scala | 8 +++++--- docs/configuration/worker.md | 2 +- .../service/deploy/worker/WorkerStatusManagerSuite.scala | 2 ++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index fe811f78e00..d5507366231 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -4487,9 +4487,11 @@ object CelebornConf extends Logging { "This is suitable for permanent scale-down scenarios where the worker will not restart. " + "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + "(recovery state will not be saved since the worker is not expected to come back). " + - "Operators should align celeborn.worker.decommission.forceExitTimeout with the pod's " + - "terminationGracePeriodSeconds.") - .version("0.6.0") + "Operators should set the pod's terminationGracePeriodSeconds to at least " + + "celeborn.worker.decommission.forceExitTimeout + " + + "celeborn.worker.decommission.checkInterval to ensure the shutdown hook " + + "has enough time to complete.") + .version("0.7.0") .booleanConf .createWithDefault(false) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index dc42b0670c7..833c92a4b2f 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -78,7 +78,7 @@ license: | | celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | -| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should align celeborn.worker.decommission.forceExitTimeout with the pod's terminationGracePeriodSeconds. | 0.6.0 | | +| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval to ensure the shutdown hook has enough time to complete. | 0.7.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | | celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | | | celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | | diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index 1953ce1990b..6f467ff3df3 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -102,5 +102,7 @@ class WorkerStatusManagerSuite extends AnyFunSuite { conf4.set("celeborn.worker.decommission.shutdown.enabled", "true") val mgr4 = new WorkerStatusManager(conf4) Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType) + Assert.assertFalse(conf4.workerGracefulShutdownEnabled) + Assert.assertTrue(conf4.workerDecommissionShutdownEnabled) } } From abc0013d9d577290d6fb500e4ed8fb039926dbe4 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Mon, 8 Jun 2026 16:24:35 +0800 Subject: [PATCH 08/10] Support decommission shutdown for worker scale-down scenarios --- .../main/scala/org/apache/celeborn/common/CelebornConf.scala | 5 ++--- .../service/deploy/worker/storage/PartitionFilesSorter.java | 2 +- .../org/apache/celeborn/service/deploy/worker/Worker.scala | 4 ++-- .../celeborn/service/deploy/worker/WorkerStatusManager.scala | 4 ++-- .../service/deploy/worker/storage/StorageManager.scala | 2 +- .../service/deploy/worker/WorkerStatusManagerSuite.scala | 4 ++-- 6 files changed, 10 insertions(+), 11 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index d5507366231..0abde4efcf6 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1357,9 +1357,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// // Graceful Shutdown & Recover // // ////////////////////////////////////////////////////// - def workerDecommissionShutdownEnabled: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) - def workerGracefulShutdownEnabled: Boolean = - get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) && !workerDecommissionShutdownEnabled + def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) + def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) def workerGracefulShutdownCheckSlotsFinishedInterval: Long = get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 3034f4507e9..5acfd292c48 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -113,7 +113,7 @@ public PartitionFilesSorter( long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight(); this.source = source; this.cleaner = new PartitionFilesCleaner(this); - boolean gracefulShutdown = conf.workerGracefulShutdownEnabled(); + boolean gracefulShutdown = conf.workerGracefulShutdown() && !conf.workerDecommissionShutdown(); // Assume a chunk won't be larger than 2GB // ShuffleClient can fetch shuffle data from a restarted worker only // when the worker's fetching port is stable and enables graceful shutdown. diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 4a07b9527dd..e46f9801c47 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -156,7 +156,7 @@ private[celeborn] class Worker( private val WORKER_SHUTDOWN_PRIORITY = 100 val shutdown = new AtomicBoolean(false) - private val gracefulShutdown = conf.workerGracefulShutdownEnabled + private val gracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown if (gracefulShutdown) { var checkPortMap = Map( WORKER_RPC_PORT -> conf.workerRpcPort, @@ -1100,7 +1100,7 @@ private[celeborn] class Worker( }, "worker-shutdown-hook-thread") - if (conf.workerDecommissionShutdownEnabled) { + if (conf.workerDecommissionShutdown) { ShutdownHookManager.get().addShutdownHook( shutdownHookThread, WORKER_SHUTDOWN_PRIORITY, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index 8a46acd012a..994d5047e6c 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -39,8 +39,8 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging private var worker: Worker = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ - private val decommissionShutdown = conf.workerDecommissionShutdownEnabled - private val gracefulShutdown = conf.workerGracefulShutdownEnabled + private val decommissionShutdown = conf.workerDecommissionShutdown + private val gracefulShutdown = conf.workerGracefulShutdown if (decommissionShutdown) { exitEventType = WorkerEventType.Decommission logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" + diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 4a679c0d8c6..1ea66157b7f 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]() // ShuffleClient can fetch data from a restarted worker only // when the worker's fetching port is stable. - val workerGracefulShutdown = conf.workerGracefulShutdownEnabled + val workerGracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown if (workerGracefulShutdown) { try { val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index 6f467ff3df3..55c35ec1b06 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -102,7 +102,7 @@ class WorkerStatusManagerSuite extends AnyFunSuite { conf4.set("celeborn.worker.decommission.shutdown.enabled", "true") val mgr4 = new WorkerStatusManager(conf4) Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType) - Assert.assertFalse(conf4.workerGracefulShutdownEnabled) - Assert.assertTrue(conf4.workerDecommissionShutdownEnabled) + Assert.assertTrue(conf4.workerGracefulShutdown) + Assert.assertTrue(conf4.workerDecommissionShutdown) } } From a5ac0671d6bed7292fb5fb1e7dbeec2165e7a68d Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Fri, 12 Jun 2026 14:15:56 +0800 Subject: [PATCH 09/10] Support decommission shutdown for worker scale-down scenarios --- .../apache/celeborn/common/CelebornConf.scala | 11 ++- docs/configuration/worker.md | 2 +- .../worker/storage/PartitionFilesSorter.java | 2 +- .../service/deploy/worker/Worker.scala | 75 +++++++++---------- .../worker/storage/StorageManager.scala | 2 +- .../worker/WorkerStatusManagerSuite.scala | 5 +- 6 files changed, 51 insertions(+), 46 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 0abde4efcf6..dfa5dc7882e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1359,6 +1359,10 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) + // Decommission shutdown overrides graceful shutdown: a decommissioned worker will not + // restart, so recovery state (recovery DB, sorter state) should not be persisted. + def effectiveWorkerGracefulShutdown: Boolean = + workerGracefulShutdown && !workerDecommissionShutdown def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) def workerGracefulShutdownCheckSlotsFinishedInterval: Long = get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL) @@ -4486,10 +4490,11 @@ object CelebornConf extends Logging { "This is suitable for permanent scale-down scenarios where the worker will not restart. " + "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + "(recovery state will not be saved since the worker is not expected to come back). " + - "Operators should set the pod's terminationGracePeriodSeconds to at least " + + "Operators should set the pod's terminationGracePeriodSeconds to " + "celeborn.worker.decommission.forceExitTimeout + " + - "celeborn.worker.decommission.checkInterval to ensure the shutdown hook " + - "has enough time to complete.") + "celeborn.worker.decommission.checkInterval plus a small buffer, to ensure " + + "the shutdown hook has enough time to complete resource cleanup before " + + "being killed.") .version("0.7.0") .booleanConf .createWithDefault(false) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 833c92a4b2f..22db19d6e79 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -78,7 +78,7 @@ license: | | celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | -| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval to ensure the shutdown hook has enough time to complete. | 0.7.0 | | +| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval plus a small buffer, to ensure the shutdown hook has enough time to complete resource cleanup before being killed. | 0.7.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | | celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | | | celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 5acfd292c48..2b3b9436bca 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -113,7 +113,7 @@ public PartitionFilesSorter( long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight(); this.source = source; this.cleaner = new PartitionFilesCleaner(this); - boolean gracefulShutdown = conf.workerGracefulShutdown() && !conf.workerDecommissionShutdown(); + boolean gracefulShutdown = conf.effectiveWorkerGracefulShutdown(); // Assume a chunk won't be larger than 2GB // ShuffleClient can fetch shuffle data from a restarted worker only // when the worker's fetching port is stable and enables graceful shutdown. diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index e46f9801c47..1c87e4100a0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -156,7 +156,7 @@ private[celeborn] class Worker( private val WORKER_SHUTDOWN_PRIORITY = 100 val shutdown = new AtomicBoolean(false) - private val gracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown + private val gracefulShutdown = conf.effectiveWorkerGracefulShutdown if (gracefulShutdown) { var checkPortMap = Map( WORKER_RPC_PORT -> conf.workerRpcPort, @@ -619,6 +619,11 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") + // Both graceful shutdown and decommission have drained data, so in-flight + // tasks are allowed to finish instead of being force-cancelled. + val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || + exitKind == CelebornExitKind.WORKER_DECOMMISSION + if (jvmProfiler != null) { jvmProfiler.stop() } @@ -626,8 +631,7 @@ private[celeborn] class Worker( jvmQuake.stop() } if (sendHeartbeatTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || - exitKind == CelebornExitKind.WORKER_DECOMMISSION) { + if (drainBeforeExit) { sendHeartbeatTask.cancel(false) } else { sendHeartbeatTask.cancel(true) @@ -635,16 +639,14 @@ private[celeborn] class Worker( sendHeartbeatTask = null } if (checkFastFailTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || - exitKind == CelebornExitKind.WORKER_DECOMMISSION) { + if (drainBeforeExit) { checkFastFailTask.cancel(false) } else { checkFastFailTask.cancel(true) } checkFastFailTask = null } - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || - exitKind == CelebornExitKind.WORKER_DECOMMISSION) { + if (drainBeforeExit) { forwardMessageScheduler.shutdown() replicateThreadPool.shutdown() commitThreadPool.shutdown() @@ -953,7 +955,7 @@ private[celeborn] class Worker( exitType.toUpperCase(Locale.ROOT) match { case "DECOMMISSION" => ShutdownHookManager.get().updateTimeout( - conf.workerDecommissionForceExitTimeout, + conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, TimeUnit.MILLISECONDS) workerStatusManager.doTransition(WorkerEventType.Decommission) case "GRACEFUL" => @@ -1034,7 +1036,9 @@ private[celeborn] class Worker( def waitTime: Long = waitTimes * interval - while (!storageManager.shuffleKeySet().isEmpty && waitTime < timeout) { + // Bound the total wait strictly by the timeout so that the remaining shutdown hook + // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. + while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval <= timeout) { Thread.sleep(interval) waitTimes += 1 } @@ -1075,41 +1079,34 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - private val shutdownHookThread = ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - logInfo("Shutdown hook called.") - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - shutdownGracefully() - case WorkerEventType.Decommission => - decommissionWorker() - case _ => - exitImmediately() - } - - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) - case WorkerEventType.Decommission => - stop(CelebornExitKind.WORKER_DECOMMISSION) - case _ => - stop(CelebornExitKind.EXIT_IMMEDIATELY) + ShutdownHookManager.get().addShutdownHook( + ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + logInfo("Shutdown hook called.") + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + shutdownGracefully() + stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) + case WorkerEventType.Decommission => + decommissionWorker() + stop(CelebornExitKind.WORKER_DECOMMISSION) + case _ => + exitImmediately() + stop(CelebornExitKind.EXIT_IMMEDIATELY) + } } - } - }, - "worker-shutdown-hook-thread") + }, + "worker-shutdown-hook-thread"), + WORKER_SHUTDOWN_PRIORITY) if (conf.workerDecommissionShutdown) { - ShutdownHookManager.get().addShutdownHook( - shutdownHookThread, - WORKER_SHUTDOWN_PRIORITY, + // The wait loop in decommissionWorker() is bounded by forceExitTimeout, so the extra + // checkInterval reserves headroom for stop(WORKER_DECOMMISSION) to finish cleanup + // before the hook is cancelled. + ShutdownHookManager.get().updateTimeout( conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, TimeUnit.MILLISECONDS) - } else { - ShutdownHookManager.get().addShutdownHook( - shutdownHookThread, - WORKER_SHUTDOWN_PRIORITY) } @VisibleForTesting diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 1ea66157b7f..0b4cff69443 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]() // ShuffleClient can fetch data from a restarted worker only // when the worker's fetching port is stable. - val workerGracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown + val workerGracefulShutdown = conf.effectiveWorkerGracefulShutdown if (workerGracefulShutdown) { try { val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index 55c35ec1b06..97c56da7a00 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -79,8 +79,11 @@ class WorkerStatusManagerSuite extends AnyFunSuite { } test("Test exitEventType initialization based on config") { - // Default: neither graceful nor decommission → Immediately + // Neither graceful nor decommission → Immediately. Set both keys explicitly so the + // assertion does not depend on system properties leaked from other tests. val conf1 = new CelebornConf() + conf1.set("celeborn.worker.graceful.shutdown.enabled", "false") + conf1.set("celeborn.worker.decommission.shutdown.enabled", "false") val mgr1 = new WorkerStatusManager(conf1) Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType) From c604d253f28783a3344a1083eba88d4e721a89da Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Tue, 16 Jun 2026 10:37:44 +0800 Subject: [PATCH 10/10] [CELEBORN-2336] Address review: fix wait-loop bound and exitEventType visibility - Bound the decommission drain by clamping each sleep to the remaining timeout, so a forceExitTimeout smaller than checkInterval still drains instead of skipping the wait and dropping unconsumed shuffle - Mark exitEventType @volatile for cross-thread visibility from the hook - Register the worker hook with explicit 4-arg addShutdownHook timeout (no process-wide updateTimeout); extract decommissionHookTimeoutMs - Skip the redundant InDecommission re-transition on the REST path - Harden exitEventType test setup; clarify budget docs --- .../apache/celeborn/common/CelebornConf.scala | 10 +- docs/configuration/worker.md | 2 +- .../service/deploy/worker/Worker.scala | 97 ++++++++++++------- .../deploy/worker/WorkerStatusManager.scala | 5 +- .../worker/WorkerStatusManagerSuite.scala | 8 +- 5 files changed, 77 insertions(+), 45 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index dfa5dc7882e..90f46cfcb2d 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -4490,11 +4490,13 @@ object CelebornConf extends Logging { "This is suitable for permanent scale-down scenarios where the worker will not restart. " + "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + "(recovery state will not be saved since the worker is not expected to come back). " + - "Operators should set the pod's terminationGracePeriodSeconds to " + + "Operators should set the pod's terminationGracePeriodSeconds to at least " + "celeborn.worker.decommission.forceExitTimeout + " + - "celeborn.worker.decommission.checkInterval plus a small buffer, to ensure " + - "the shutdown hook has enough time to complete resource cleanup before " + - "being killed.") + "celeborn.worker.decommission.checkInterval, plus additional headroom for final " + + "resource cleanup. The drain wait is bounded by forceExitTimeout, but the " + + "subsequent stop() runs within the same budget and deletes any still-unreleased " + + "shuffle, whose cost grows with the amount of undrained data; size the extra " + + "headroom for that worst case so the pod is not force-killed mid-cleanup.") .version("0.7.0") .booleanConf .createWithDefault(false) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 22db19d6e79..0042a6b1082 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -78,7 +78,7 @@ license: | | celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | -| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval plus a small buffer, to ensure the shutdown hook has enough time to complete resource cleanup before being killed. | 0.7.0 | | +| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval, plus additional headroom for final resource cleanup. The drain wait is bounded by forceExitTimeout, but the subsequent stop() runs within the same budget and deletes any still-unreleased shuffle, whose cost grows with the amount of undrained data; size the extra headroom for that worst case so the pod is not force-killed mid-cleanup. | 0.7.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | | celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | | | celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 1c87e4100a0..8226e0bd888 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -619,8 +619,11 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") - // Both graceful shutdown and decommission have drained data, so in-flight - // tasks are allowed to finish instead of being force-cancelled. + // Both graceful shutdown and decommission have drained data, so the periodic + // thread pools below are allowed to finish in-flight work instead of being + // force-cancelled. Note this does not extend to partitionsSorter.close(), which + // only awaits termination on graceful shutdown; on decommission an in-flight + // on-demand sort is still force-cancelled (see PartitionFilesSorter.close). val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || exitKind == CelebornExitKind.WORKER_DECOMMISSION @@ -954,8 +957,12 @@ private[celeborn] class Worker( override def exit(exitType: String): String = { exitType.toUpperCase(Locale.ROOT) match { case "DECOMMISSION" => + // A runtime REST decommission can target a worker not configured for + // decommission-on-shutdown, whose hook was registered with the default timeout. + // updateTimeout is the only API to extend an already-registered hook, so it is + // used here despite being process-wide. ShutdownHookManager.get().updateTimeout( - conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, + decommissionHookTimeoutMs, TimeUnit.MILLISECONDS) workerStatusManager.doTransition(WorkerEventType.Decommission) case "GRACEFUL" => @@ -1027,27 +1034,34 @@ private[celeborn] class Worker( def decommissionWorker(): Unit = { logInfo("Worker start to decommission") - workerStatusManager.transitionState(State.InDecommission) + // A runtime REST exit("DECOMMISSION") already moved the state to InDecommission before + // triggering the shutdown hook; skip the re-transition in that case to avoid a spurious + // "transition not allowed" warning. The master report still runs on both paths. + if (workerStatusManager.getWorkerState() != State.InDecommission) { + workerStatusManager.transitionState(State.InDecommission) + } sendWorkerDecommissionToMaster() shutdown.set(true) val interval = conf.workerDecommissionCheckInterval val timeout = conf.workerDecommissionForceExitTimeout - var waitTimes = 0 - - def waitTime: Long = waitTimes * interval + var waited = 0L // Bound the total wait strictly by the timeout so that the remaining shutdown hook - // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. - while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval <= timeout) { - Thread.sleep(interval) - waitTimes += 1 + // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. Clamp the sleep + // to the remaining budget so that a forceExitTimeout smaller than checkInterval still + // drains (bounded by timeout) instead of skipping the wait entirely and deleting shuffle + // data that consumers still need. + while (!storageManager.shuffleKeySet().isEmpty && waited < timeout) { + val sleepMs = Math.min(interval, timeout - waited) + Thread.sleep(sleepMs) + waited += sleepMs } val unreleasedShuffleKeys = storageManager.shuffleKeySet() if (unreleasedShuffleKeys.isEmpty) { - logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.") + logInfo(s"Waiting for all shuffle expired cost ${waited}ms.") } else { - logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " + + logWarning(s"Waiting for all shuffle expired cost ${waited}ms, " + s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}") } workerStatusManager.transitionState(State.Exit) @@ -1079,34 +1093,43 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - ShutdownHookManager.get().addShutdownHook( - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - logInfo("Shutdown hook called.") - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - shutdownGracefully() - stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) - case WorkerEventType.Decommission => - decommissionWorker() - stop(CelebornExitKind.WORKER_DECOMMISSION) - case _ => - exitImmediately() - stop(CelebornExitKind.EXIT_IMMEDIATELY) - } + // Total shutdown-hook budget for the decommission path: the bounded drain wait in + // decommissionWorker() (forceExitTimeout) plus checkInterval of headroom for + // stop(WORKER_DECOMMISSION) to finish cleanup before the hook is cancelled. Shared by the + // SIGTERM registration below and the runtime REST decommission path so they cannot drift. + private def decommissionHookTimeoutMs: Long = + conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval + + private val workerShutdownHook = ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + logInfo("Shutdown hook called.") + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + shutdownGracefully() + stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) + case WorkerEventType.Decommission => + decommissionWorker() + stop(CelebornExitKind.WORKER_DECOMMISSION) + case _ => + exitImmediately() + stop(CelebornExitKind.EXIT_IMMEDIATELY) } - }, - "worker-shutdown-hook-thread"), - WORKER_SHUTDOWN_PRIORITY) + } + }, + "worker-shutdown-hook-thread") if (conf.workerDecommissionShutdown) { - // The wait loop in decommissionWorker() is bounded by forceExitTimeout, so the extra - // checkInterval reserves headroom for stop(WORKER_DECOMMISSION) to finish cleanup - // before the hook is cancelled. - ShutdownHookManager.get().updateTimeout( - conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval, + // Register the worker hook with an explicit extended timeout so that only this hook + // gets the longer budget, instead of process-wide updateTimeout which would also extend + // unrelated hooks. + ShutdownHookManager.get().addShutdownHook( + workerShutdownHook, + WORKER_SHUTDOWN_PRIORITY, + decommissionHookTimeoutMs, TimeUnit.MILLISECONDS) + } else { + ShutdownHookManager.get().addShutdownHook(workerShutdownHook, WORKER_SHUTDOWN_PRIORITY) } @VisibleForTesting diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index 994d5047e6c..803f75f85e0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -35,7 +35,10 @@ import org.apache.celeborn.service.deploy.worker.storage.StorageManager private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging { var currentWorkerStatus = WorkerStatus.normalWorkerStatus() - var exitEventType = WorkerEventType.Immediately + // Written under this.synchronized but read unsynchronized from the shutdown-hook thread + // (Worker.scala), so it must be volatile to guarantee visibility of a runtime REST + // exit(...) update before an independent SIGTERM-triggered hook reads it. + @volatile var exitEventType = WorkerEventType.Immediately private var worker: Worker = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index 97c56da7a00..fd77d0d144d 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -87,14 +87,18 @@ class WorkerStatusManagerSuite extends AnyFunSuite { val mgr1 = new WorkerStatusManager(conf1) Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType) - // Graceful shutdown only → Graceful + // Graceful shutdown only → Graceful. Set both keys explicitly so a leaked + // decommission.shutdown.enabled system property cannot flip the result. val conf2 = new CelebornConf() conf2.set("celeborn.worker.graceful.shutdown.enabled", "true") + conf2.set("celeborn.worker.decommission.shutdown.enabled", "false") val mgr2 = new WorkerStatusManager(conf2) Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType) - // Decommission shutdown only → Decommission + // Decommission shutdown only → Decommission. Set both keys explicitly so a leaked + // graceful.shutdown.enabled system property cannot affect the result. val conf3 = new CelebornConf() + conf3.set("celeborn.worker.graceful.shutdown.enabled", "false") conf3.set("celeborn.worker.decommission.shutdown.enabled", "true") val mgr3 = new WorkerStatusManager(conf3) Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType)