diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 668fbcf79e4..1248baa5627 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1359,7 +1359,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// // Graceful Shutdown & Recover // // ////////////////////////////////////////////////////// + def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED) def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED) + // Decommission shutdown overrides graceful shutdown: a decommissioned worker will not + // restart, so recovery state (recovery DB, sorter state) should not be persisted. + def effectiveWorkerGracefulShutdown: Boolean = + workerGracefulShutdown && !workerDecommissionShutdown def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT) def workerGracefulShutdownCheckSlotsFinishedInterval: Long = get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL) @@ -4479,6 +4484,25 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("6h") + val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.worker.decommission.shutdown.enabled") + .categories("worker") + .doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " + + "waiting for all shuffle data to be consumed or expired before exiting. " + + "This is suitable for permanent scale-down scenarios where the worker will not restart. " + + "When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " + + "(recovery state will not be saved since the worker is not expected to come back). " + + "Operators should set the pod's terminationGracePeriodSeconds to at least " + + "celeborn.worker.decommission.forceExitTimeout + " + + "celeborn.worker.decommission.checkInterval, plus additional headroom for final " + + "resource cleanup. The drain wait is bounded by forceExitTimeout, but the " + + "subsequent stop() runs within the same budget and deletes any still-unreleased " + + "shuffle, whose cost grows with the amount of undrained data; size the extra " + + "headroom for that worst case so the pod is not force-killed mid-cleanup.") + .version("0.7.0") + .booleanConf + .createWithDefault(false) + val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] = buildConf("celeborn.worker.graceful.shutdown.enabled") .categories("worker") diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index bb2cec89bc3..0042a6b1082 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -78,6 +78,7 @@ license: | | celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | | | celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | | | celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | | +| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval, plus additional headroom for final resource cleanup. The drain wait is bounded by forceExitTimeout, but the subsequent stop() runs within the same budget and deletes any still-unreleased shuffle, whose cost grows with the amount of undrained data; size the extra headroom for that worst case so the pod is not force-killed mid-cleanup. | 0.7.0 | | | celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | | | celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | | | celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java index 5979a8434c2..2b3b9436bca 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java @@ -113,7 +113,7 @@ public PartitionFilesSorter( long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight(); this.source = source; this.cleaner = new PartitionFilesCleaner(this); - boolean gracefulShutdown = conf.workerGracefulShutdown(); + boolean gracefulShutdown = conf.effectiveWorkerGracefulShutdown(); // Assume a chunk won't be larger than 2GB // ShuffleClient can fetch shuffle data from a restarted worker only // when the worker's fetching port is stable and enables graceful shutdown. diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index 91e145ba659..ab5f2ee275e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -156,7 +156,7 @@ private[celeborn] class Worker( private val WORKER_SHUTDOWN_PRIORITY = 100 val shutdown = new AtomicBoolean(false) - private val gracefulShutdown = conf.workerGracefulShutdown + private val gracefulShutdown = conf.effectiveWorkerGracefulShutdown if (gracefulShutdown) { var checkPortMap = Map( WORKER_RPC_PORT -> conf.workerRpcPort, @@ -619,6 +619,14 @@ private[celeborn] class Worker( if (!stopped) { logInfo("Stopping Worker.") + // Both graceful shutdown and decommission have drained data, so the periodic + // thread pools below are allowed to finish in-flight work instead of being + // force-cancelled. Note this does not extend to partitionsSorter.close(), which + // only awaits termination on graceful shutdown; on decommission an in-flight + // on-demand sort is still force-cancelled (see PartitionFilesSorter.close). + val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN || + exitKind == CelebornExitKind.WORKER_DECOMMISSION + if (jvmProfiler != null) { jvmProfiler.stop() } @@ -626,7 +634,7 @@ private[celeborn] class Worker( jvmQuake.stop() } if (sendHeartbeatTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (drainBeforeExit) { sendHeartbeatTask.cancel(false) } else { sendHeartbeatTask.cancel(true) @@ -634,14 +642,14 @@ private[celeborn] class Worker( sendHeartbeatTask = null } if (checkFastFailTask != null) { - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (drainBeforeExit) { checkFastFailTask.cancel(false) } else { checkFastFailTask.cancel(true) } checkFastFailTask = null } - if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) { + if (drainBeforeExit) { forwardMessageScheduler.shutdown() replicateThreadPool.shutdown() commitThreadPool.shutdown() @@ -949,8 +957,12 @@ private[celeborn] class Worker( override def exit(exitType: String): String = { exitType.toUpperCase(Locale.ROOT) match { case "DECOMMISSION" => + // A runtime REST decommission can target a worker not configured for + // decommission-on-shutdown, whose hook was registered with the default timeout. + // updateTimeout is the only API to extend an already-registered hook, so it is + // used here despite being process-wide. ShutdownHookManager.get().updateTimeout( - conf.workerDecommissionForceExitTimeout, + decommissionHookTimeoutMs, TimeUnit.MILLISECONDS) workerStatusManager.doTransition(WorkerEventType.Decommission) case "GRACEFUL" => @@ -1022,25 +1034,34 @@ private[celeborn] class Worker( def decommissionWorker(): Unit = { logInfo("Worker start to decommission") - workerStatusManager.transitionState(State.InDecommission) + // A runtime REST exit("DECOMMISSION") already moved the state to InDecommission before + // triggering the shutdown hook; skip the re-transition in that case to avoid a spurious + // "transition not allowed" warning. The master report still runs on both paths. + if (workerStatusManager.getWorkerState() != State.InDecommission) { + workerStatusManager.transitionState(State.InDecommission) + } sendWorkerDecommissionToMaster() shutdown.set(true) val interval = conf.workerDecommissionCheckInterval val timeout = conf.workerDecommissionForceExitTimeout - var waitTimes = 0 - - def waitTime: Long = waitTimes * interval - - while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval < timeout) { - Thread.sleep(interval) - waitTimes += 1 + var waited = 0L + + // Bound the total wait strictly by the timeout so that the remaining shutdown hook + // budget is left for stop(WORKER_DECOMMISSION) to clean up resources. Clamp the sleep + // to the remaining budget so that a forceExitTimeout smaller than checkInterval still + // drains (bounded by timeout) instead of skipping the wait entirely and deleting shuffle + // data that consumers still need. + while (!storageManager.shuffleKeySet().isEmpty && waited < timeout) { + val sleepMs = Math.min(interval, timeout - waited) + Thread.sleep(sleepMs) + waited += sleepMs } val unreleasedShuffleKeys = storageManager.shuffleKeySet() if (unreleasedShuffleKeys.isEmpty) { - logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.") + logInfo(s"Waiting for all shuffle expired cost ${waited}ms.") } else { - logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " + + logWarning(s"Waiting for all shuffle expired cost ${waited}ms, " + s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}") } workerStatusManager.transitionState(State.Exit) @@ -1072,29 +1093,44 @@ private[celeborn] class Worker( workerStatusManager.transitionState(State.Exit) } - ShutdownHookManager.get().addShutdownHook( - ThreadUtils.newThread( - new Runnable { - override def run(): Unit = { - logInfo("Shutdown hook called.") - workerStatusManager.exitEventType match { - case WorkerEventType.Graceful => - shutdownGracefully() - case WorkerEventType.Decommission => - decommissionWorker() - case _ => - exitImmediately() - } + // Total shutdown-hook budget for the decommission path: the bounded drain wait in + // decommissionWorker() (forceExitTimeout) plus checkInterval of headroom for + // stop(WORKER_DECOMMISSION) to finish cleanup before the hook is cancelled. Shared by the + // SIGTERM registration below and the runtime REST decommission path so they cannot drift. + private def decommissionHookTimeoutMs: Long = + conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval - if (workerStatusManager.exitEventType == WorkerEventType.Graceful) { + private val workerShutdownHook = ThreadUtils.newThread( + new Runnable { + override def run(): Unit = { + logInfo("Shutdown hook called.") + workerStatusManager.exitEventType match { + case WorkerEventType.Graceful => + shutdownGracefully() stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) - } else { + case WorkerEventType.Decommission => + decommissionWorker() + stop(CelebornExitKind.WORKER_DECOMMISSION) + case _ => + exitImmediately() stop(CelebornExitKind.EXIT_IMMEDIATELY) - } } - }, - "worker-shutdown-hook-thread"), - WORKER_SHUTDOWN_PRIORITY) + } + }, + "worker-shutdown-hook-thread") + + if (conf.workerDecommissionShutdown) { + // Register the worker hook with an explicit extended timeout so that only this hook + // gets the longer budget, instead of process-wide updateTimeout which would also extend + // unrelated hooks. + ShutdownHookManager.get().addShutdownHook( + workerShutdownHook, + WORKER_SHUTDOWN_PRIORITY, + decommissionHookTimeoutMs, + TimeUnit.MILLISECONDS) + } else { + ShutdownHookManager.get().addShutdownHook(workerShutdownHook, WORKER_SHUTDOWN_PRIORITY) + } @VisibleForTesting def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala index e31fcf576a2..803f75f85e0 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManager.scala @@ -35,12 +35,20 @@ import org.apache.celeborn.service.deploy.worker.storage.StorageManager private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging { var currentWorkerStatus = WorkerStatus.normalWorkerStatus() - var exitEventType = WorkerEventType.Immediately + // Written under this.synchronized but read unsynchronized from the shutdown-hook thread + // (Worker.scala), so it must be volatile to guarantee visibility of a runtime REST + // exit(...) update before an independent SIGTERM-triggered hook reads it. + @volatile var exitEventType = WorkerEventType.Immediately private var worker: Worker = _ private var shutdown: AtomicBoolean = _ private var storageManager: StorageManager = _ + private val decommissionShutdown = conf.workerDecommissionShutdown private val gracefulShutdown = conf.workerGracefulShutdown - if (gracefulShutdown) { + if (decommissionShutdown) { + exitEventType = WorkerEventType.Decommission + logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" + + " (overrides graceful shutdown)") + } else if (gracefulShutdown) { exitEventType = WorkerEventType.Graceful } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 9a2d4a8a740..0b4cff69443 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]() // ShuffleClient can fetch data from a restarted worker only // when the worker's fetching port is stable. - val workerGracefulShutdown = conf.workerGracefulShutdown + val workerGracefulShutdown = conf.effectiveWorkerGracefulShutdown if (workerGracefulShutdown) { try { val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend) diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala index c787be6882d..fd77d0d144d 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/WorkerStatusManagerSuite.scala @@ -57,24 +57,59 @@ class WorkerStatusManagerSuite extends AnyFunSuite { statusManager.init(worker) statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle) + Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState()) Assert.assertEquals( - worker.workerInfo.getWorkerStatus().getStateValue, - PbWorkerStatus.State.InDecommissionThenIdle.getNumber) + PbWorkerStatus.State.InDecommissionThenIdle.getNumber, + worker.workerInfo.getWorkerStatus().getStateValue) // Rerun state Transition statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle) + Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState()) // Reset shuffleKeys shuffleKeys.clear() statusManager.doTransition(WorkerEventType.DecommissionThenIdle) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Idle) + Assert.assertEquals(PbWorkerStatus.State.Idle, statusManager.getWorkerState()) statusManager.doTransition(WorkerEventType.Recommission) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal) + Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState()) statusManager.doTransition(WorkerEventType.Recommission) - Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal) + Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState()) + } + + test("Test exitEventType initialization based on config") { + // Neither graceful nor decommission → Immediately. Set both keys explicitly so the + // assertion does not depend on system properties leaked from other tests. + val conf1 = new CelebornConf() + conf1.set("celeborn.worker.graceful.shutdown.enabled", "false") + conf1.set("celeborn.worker.decommission.shutdown.enabled", "false") + val mgr1 = new WorkerStatusManager(conf1) + Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType) + + // Graceful shutdown only → Graceful. Set both keys explicitly so a leaked + // decommission.shutdown.enabled system property cannot flip the result. + val conf2 = new CelebornConf() + conf2.set("celeborn.worker.graceful.shutdown.enabled", "true") + conf2.set("celeborn.worker.decommission.shutdown.enabled", "false") + val mgr2 = new WorkerStatusManager(conf2) + Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType) + + // Decommission shutdown only → Decommission. Set both keys explicitly so a leaked + // graceful.shutdown.enabled system property cannot affect the result. + val conf3 = new CelebornConf() + conf3.set("celeborn.worker.graceful.shutdown.enabled", "false") + conf3.set("celeborn.worker.decommission.shutdown.enabled", "true") + val mgr3 = new WorkerStatusManager(conf3) + Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType) + + // Both enabled → Decommission overrides graceful + val conf4 = new CelebornConf() + conf4.set("celeborn.worker.graceful.shutdown.enabled", "true") + conf4.set("celeborn.worker.decommission.shutdown.enabled", "true") + val mgr4 = new WorkerStatusManager(conf4) + Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType) + Assert.assertTrue(conf4.workerGracefulShutdown) + Assert.assertTrue(conf4.workerDecommissionShutdown) } }