Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,12 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
// Decommission shutdown overrides graceful shutdown: a decommissioned worker will not
// restart, so recovery state (recovery DB, sorter state) should not be persisted.
def effectiveWorkerGracefulShutdown: Boolean =
workerGracefulShutdown && !workerDecommissionShutdown
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
get(WORKER_CHECK_SLOTS_FINISHED_INTERVAL)
Expand Down Expand Up @@ -4479,6 +4484,25 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")

val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.decommission.shutdown.enabled")
.categories("worker")
.doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " +
"waiting for all shuffle data to be consumed or expired before exiting. " +
"This is suitable for permanent scale-down scenarios where the worker will not restart. " +
"When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " +
"(recovery state will not be saved since the worker is not expected to come back). " +
"Operators should set the pod's terminationGracePeriodSeconds to at least " +
"celeborn.worker.decommission.forceExitTimeout + " +
"celeborn.worker.decommission.checkInterval, plus additional headroom for final " +
"resource cleanup. The drain wait is bounded by forceExitTimeout, but the " +
"subsequent stop() runs within the same budget and deletes any still-unreleased " +
"shuffle, whose cost grows with the amount of undrained data; size the extra " +
"headroom for that worst case so the pod is not force-killed mid-cleanup.")
.version("0.7.0")
.booleanConf
.createWithDefault(false)

val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ license: |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval, plus additional headroom for final resource cleanup. The drain wait is bounded by forceExitTimeout, but the subsequent stop() runs within the same budget and deletes any still-unreleased shuffle, whose cost grows with the amount of undrained data; size the extra headroom for that worst case so the pod is not force-killed mid-cleanup. | 0.7.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public PartitionFilesSorter(
long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight();
this.source = source;
this.cleaner = new PartitionFilesCleaner(this);
boolean gracefulShutdown = conf.workerGracefulShutdown();
boolean gracefulShutdown = conf.effectiveWorkerGracefulShutdown();
// Assume a chunk won't be larger than 2GB
// ShuffleClient can fetch shuffle data from a restarted worker only
// when the worker's fetching port is stable and enables graceful shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[celeborn] class Worker(

private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
private val gracefulShutdown = conf.effectiveWorkerGracefulShutdown
if (gracefulShutdown) {
var checkPortMap = Map(
WORKER_RPC_PORT -> conf.workerRpcPort,
Expand Down Expand Up @@ -619,29 +619,37 @@ private[celeborn] class Worker(
if (!stopped) {
logInfo("Stopping Worker.")

// Both graceful shutdown and decommission have drained data, so the periodic
// thread pools below are allowed to finish in-flight work instead of being
// force-cancelled. Note this does not extend to partitionsSorter.close(), which
// only awaits termination on graceful shutdown; on decommission an in-flight
// on-demand sort is still force-cancelled (see PartitionFilesSorter.close).
val drainBeforeExit = exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION

if (jvmProfiler != null) {
jvmProfiler.stop()
}
if (jvmQuake != null) {
jvmQuake.stop()
}
if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
sendHeartbeatTask.cancel(false)
} else {
sendHeartbeatTask.cancel(true)
}
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
checkFastFailTask.cancel(false)
} else {
checkFastFailTask.cancel(true)
}
checkFastFailTask = null
}
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (drainBeforeExit) {
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
Expand Down Expand Up @@ -949,8 +957,12 @@ private[celeborn] class Worker(
override def exit(exitType: String): String = {
exitType.toUpperCase(Locale.ROOT) match {
case "DECOMMISSION" =>
// A runtime REST decommission can target a worker not configured for
// decommission-on-shutdown, whose hook was registered with the default timeout.
// updateTimeout is the only API to extend an already-registered hook, so it is
// used here despite being process-wide.
ShutdownHookManager.get().updateTimeout(
conf.workerDecommissionForceExitTimeout,
decommissionHookTimeoutMs,
TimeUnit.MILLISECONDS)
workerStatusManager.doTransition(WorkerEventType.Decommission)
case "GRACEFUL" =>
Expand Down Expand Up @@ -1022,25 +1034,34 @@ private[celeborn] class Worker(

def decommissionWorker(): Unit = {
logInfo("Worker start to decommission")
workerStatusManager.transitionState(State.InDecommission)
// A runtime REST exit("DECOMMISSION") already moved the state to InDecommission before
// triggering the shutdown hook; skip the re-transition in that case to avoid a spurious
// "transition not allowed" warning. The master report still runs on both paths.
if (workerStatusManager.getWorkerState() != State.InDecommission) {
workerStatusManager.transitionState(State.InDecommission)
}
sendWorkerDecommissionToMaster()
shutdown.set(true)
val interval = conf.workerDecommissionCheckInterval
val timeout = conf.workerDecommissionForceExitTimeout
var waitTimes = 0

def waitTime: Long = waitTimes * interval

while (!storageManager.shuffleKeySet().isEmpty && waitTime + interval < timeout) {
Thread.sleep(interval)
waitTimes += 1
var waited = 0L

// Bound the total wait strictly by the timeout so that the remaining shutdown hook
// budget is left for stop(WORKER_DECOMMISSION) to clean up resources. Clamp the sleep
// to the remaining budget so that a forceExitTimeout smaller than checkInterval still
// drains (bounded by timeout) instead of skipping the wait entirely and deleting shuffle
// data that consumers still need.
while (!storageManager.shuffleKeySet().isEmpty && waited < timeout) {
val sleepMs = Math.min(interval, timeout - waited)
Thread.sleep(sleepMs)
waited += sleepMs
}

val unreleasedShuffleKeys = storageManager.shuffleKeySet()
if (unreleasedShuffleKeys.isEmpty) {
logInfo(s"Waiting for all shuffle expired cost ${waitTime}ms.")
logInfo(s"Waiting for all shuffle expired cost ${waited}ms.")
} else {
logWarning(s"Waiting for all shuffle expired cost ${waitTime}ms, " +
logWarning(s"Waiting for all shuffle expired cost ${waited}ms, " +
s"unreleased shuffle: \n${unreleasedShuffleKeys.asScala.mkString("[", ", ", "]")}")
}
workerStatusManager.transitionState(State.Exit)
Expand Down Expand Up @@ -1072,29 +1093,44 @@ private[celeborn] class Worker(
workerStatusManager.transitionState(State.Exit)
}

ShutdownHookManager.get().addShutdownHook(
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
workerStatusManager.exitEventType match {
case WorkerEventType.Graceful =>
shutdownGracefully()
case WorkerEventType.Decommission =>
decommissionWorker()
case _ =>
exitImmediately()
}
// Total shutdown-hook budget for the decommission path: the bounded drain wait in
// decommissionWorker() (forceExitTimeout) plus checkInterval of headroom for
// stop(WORKER_DECOMMISSION) to finish cleanup before the hook is cancelled. Shared by the
// SIGTERM registration below and the runtime REST decommission path so they cannot drift.
private def decommissionHookTimeoutMs: Long =
conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval

if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
private val workerShutdownHook = ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
workerStatusManager.exitEventType match {
case WorkerEventType.Graceful =>
shutdownGracefully()
stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
} else {
case WorkerEventType.Decommission =>
decommissionWorker()
stop(CelebornExitKind.WORKER_DECOMMISSION)
case _ =>
exitImmediately()
stop(CelebornExitKind.EXIT_IMMEDIATELY)
}
}
},
"worker-shutdown-hook-thread"),
WORKER_SHUTDOWN_PRIORITY)
}
},
"worker-shutdown-hook-thread")

if (conf.workerDecommissionShutdown) {
// Register the worker hook with an explicit extended timeout so that only this hook
// gets the longer budget, instead of process-wide updateTimeout which would also extend
// unrelated hooks.
ShutdownHookManager.get().addShutdownHook(
workerShutdownHook,
WORKER_SHUTDOWN_PRIORITY,
decommissionHookTimeoutMs,
TimeUnit.MILLISECONDS)
} else {
ShutdownHookManager.get().addShutdownHook(workerShutdownHook, WORKER_SHUTDOWN_PRIORITY)
}

@VisibleForTesting
def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,20 @@ import org.apache.celeborn.service.deploy.worker.storage.StorageManager
private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging {

var currentWorkerStatus = WorkerStatus.normalWorkerStatus()
var exitEventType = WorkerEventType.Immediately
// Written under this.synchronized but read unsynchronized from the shutdown-hook thread
// (Worker.scala), so it must be volatile to guarantee visibility of a runtime REST
// exit(...) update before an independent SIGTERM-triggered hook reads it.
@volatile var exitEventType = WorkerEventType.Immediately
private var worker: Worker = _
private var shutdown: AtomicBoolean = _
private var storageManager: StorageManager = _
private val decommissionShutdown = conf.workerDecommissionShutdown
private val gracefulShutdown = conf.workerGracefulShutdown
if (gracefulShutdown) {
if (decommissionShutdown) {
exitEventType = WorkerEventType.Decommission
logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" +
" (overrides graceful shutdown)")
} else if (gracefulShutdown) {
exitEventType = WorkerEventType.Graceful
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]()
// ShuffleClient can fetch data from a restarted worker only
// when the worker's fetching port is stable.
val workerGracefulShutdown = conf.workerGracefulShutdown
val workerGracefulShutdown = conf.effectiveWorkerGracefulShutdown
if (workerGracefulShutdown) {
try {
val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,59 @@ class WorkerStatusManagerSuite extends AnyFunSuite {
statusManager.init(worker)

statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())
Assert.assertEquals(
worker.workerInfo.getWorkerStatus().getStateValue,
PbWorkerStatus.State.InDecommissionThenIdle.getNumber)
PbWorkerStatus.State.InDecommissionThenIdle.getNumber,
worker.workerInfo.getWorkerStatus().getStateValue)

// Rerun state Transition
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())

// Reset shuffleKeys
shuffleKeys.clear()
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Idle)
Assert.assertEquals(PbWorkerStatus.State.Idle, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())
}

test("Test exitEventType initialization based on config") {
// Neither graceful nor decommission → Immediately. Set both keys explicitly so the
// assertion does not depend on system properties leaked from other tests.
val conf1 = new CelebornConf()
conf1.set("celeborn.worker.graceful.shutdown.enabled", "false")
conf1.set("celeborn.worker.decommission.shutdown.enabled", "false")
val mgr1 = new WorkerStatusManager(conf1)
Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType)

// Graceful shutdown only → Graceful. Set both keys explicitly so a leaked
// decommission.shutdown.enabled system property cannot flip the result.
val conf2 = new CelebornConf()
conf2.set("celeborn.worker.graceful.shutdown.enabled", "true")
conf2.set("celeborn.worker.decommission.shutdown.enabled", "false")
val mgr2 = new WorkerStatusManager(conf2)
Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType)

// Decommission shutdown only → Decommission. Set both keys explicitly so a leaked
// graceful.shutdown.enabled system property cannot affect the result.
val conf3 = new CelebornConf()
conf3.set("celeborn.worker.graceful.shutdown.enabled", "false")
conf3.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr3 = new WorkerStatusManager(conf3)
Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType)

// Both enabled → Decommission overrides graceful
val conf4 = new CelebornConf()
conf4.set("celeborn.worker.graceful.shutdown.enabled", "true")
conf4.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr4 = new WorkerStatusManager(conf4)
Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType)
Assert.assertTrue(conf4.workerGracefulShutdown)
Assert.assertTrue(conf4.workerDecommissionShutdown)
}
}
Loading