Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// //////////////////////////////////////////////////////
// Graceful Shutdown & Recover //
// //////////////////////////////////////////////////////
def workerDecommissionShutdown: Boolean = get(WORKER_DECOMMISSION_SHUTDOWN_ENABLED)
Comment thread
chenghuichen marked this conversation as resolved.
def workerGracefulShutdown: Boolean = get(WORKER_GRACEFUL_SHUTDOWN_ENABLED)
def workerGracefulShutdownTimeoutMs: Long = get(WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownCheckSlotsFinishedInterval: Long =
Expand Down Expand Up @@ -4477,6 +4478,22 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("6h")

val WORKER_DECOMMISSION_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.decommission.shutdown.enabled")
Comment thread
chenghuichen marked this conversation as resolved.
.categories("worker")
.doc("When true, the worker will decommission on shutdown signal (e.g. SIGTERM), " +
"waiting for all shuffle data to be consumed or expired before exiting. " +
"This is suitable for permanent scale-down scenarios where the worker will not restart. " +
"When enabled, this overrides celeborn.worker.graceful.shutdown.enabled " +
"(recovery state will not be saved since the worker is not expected to come back). " +
"Operators should set the pod's terminationGracePeriodSeconds to at least " +
"celeborn.worker.decommission.forceExitTimeout + " +
"celeborn.worker.decommission.checkInterval to ensure the shutdown hook " +
"has enough time to complete.")
.version("0.7.0")
.booleanConf
.createWithDefault(false)

val WORKER_GRACEFUL_SHUTDOWN_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.graceful.shutdown.enabled")
.categories("worker")
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ license: |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | 9223372036854775807b | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.decommission.shutdown.enabled | false | false | When true, the worker will decommission on shutdown signal (e.g. SIGTERM), waiting for all shuffle data to be consumed or expired before exiting. This is suitable for permanent scale-down scenarios where the worker will not restart. When enabled, this overrides celeborn.worker.graceful.shutdown.enabled (recovery state will not be saved since the worker is not expected to come back). Operators should set the pod's terminationGracePeriodSeconds to at least celeborn.worker.decommission.forceExitTimeout + celeborn.worker.decommission.checkInterval to ensure the shutdown hook has enough time to complete. | 0.7.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
| celeborn.worker.directMemoryRatioForReadBuffer | 0.35 | false | Max ratio of direct memory for read buffer | 0.2.0 | |
| celeborn.worker.directMemoryRatioToMergeBuffer | 0.4 | false | If direct memory usage is above this limit, the worker will merge low utilization push data's body buffer | 0.6.2 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public PartitionFilesSorter(
long indexCacheMaxWeight = conf.workerPartitionSorterIndexCacheMaxWeight();
this.source = source;
this.cleaner = new PartitionFilesCleaner(this);
boolean gracefulShutdown = conf.workerGracefulShutdown();
boolean gracefulShutdown = conf.workerGracefulShutdown() && !conf.workerDecommissionShutdown();
// Assume a chunk won't be larger than 2GB
// ShuffleClient can fetch shuffle data from a restarted worker only
// when the worker's fetching port is stable and enables graceful shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[celeborn] class Worker(

private val WORKER_SHUTDOWN_PRIORITY = 100
val shutdown = new AtomicBoolean(false)
private val gracefulShutdown = conf.workerGracefulShutdown
private val gracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
if (gracefulShutdown) {
var checkPortMap = Map(
WORKER_RPC_PORT -> conf.workerRpcPort,
Expand Down Expand Up @@ -626,22 +626,25 @@ private[celeborn] class Worker(
jvmQuake.stop()
}
if (sendHeartbeatTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION) {
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
sendHeartbeatTask.cancel(false)
} else {
sendHeartbeatTask.cancel(true)
}
sendHeartbeatTask = null
}
if (checkFastFailTask != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION) {
checkFastFailTask.cancel(false)
} else {
checkFastFailTask.cancel(true)
}
checkFastFailTask = null
}
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN ||
exitKind == CelebornExitKind.WORKER_DECOMMISSION) {
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
forwardMessageScheduler.shutdown()
replicateThreadPool.shutdown()
commitThreadPool.shutdown()
Expand Down Expand Up @@ -1072,29 +1075,42 @@ private[celeborn] class Worker(
workerStatusManager.transitionState(State.Exit)
}

ShutdownHookManager.get().addShutdownHook(
ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
workerStatusManager.exitEventType match {
case WorkerEventType.Graceful =>
shutdownGracefully()
case WorkerEventType.Decommission =>
decommissionWorker()
case _ =>
exitImmediately()
}
private val shutdownHookThread = ThreadUtils.newThread(
new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
workerStatusManager.exitEventType match {
Comment thread
chenghuichen marked this conversation as resolved.
case WorkerEventType.Graceful =>
shutdownGracefully()
case WorkerEventType.Decommission =>
decommissionWorker()
case _ =>
exitImmediately()
}

if (workerStatusManager.exitEventType == WorkerEventType.Graceful) {
workerStatusManager.exitEventType match {
case WorkerEventType.Graceful =>
stop(CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN)
} else {
case WorkerEventType.Decommission =>
stop(CelebornExitKind.WORKER_DECOMMISSION)
case _ =>
stop(CelebornExitKind.EXIT_IMMEDIATELY)
}
}
},
"worker-shutdown-hook-thread"),
WORKER_SHUTDOWN_PRIORITY)
}
},
"worker-shutdown-hook-thread")

if (conf.workerDecommissionShutdown) {
Comment thread
chenghuichen marked this conversation as resolved.
ShutdownHookManager.get().addShutdownHook(
shutdownHookThread,
WORKER_SHUTDOWN_PRIORITY,
conf.workerDecommissionForceExitTimeout + conf.workerDecommissionCheckInterval,
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
Comment thread
chenghuichen marked this conversation as resolved.
Outdated
TimeUnit.MILLISECONDS)
} else {
ShutdownHookManager.get().addShutdownHook(
shutdownHookThread,
WORKER_SHUTDOWN_PRIORITY)
}

@VisibleForTesting
def getPushFetchServerPort: (Int, Int) = (pushPort, fetchPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ private[celeborn] class WorkerStatusManager(conf: CelebornConf) extends Logging
private var worker: Worker = _
private var shutdown: AtomicBoolean = _
private var storageManager: StorageManager = _
private val decommissionShutdown = conf.workerDecommissionShutdown
private val gracefulShutdown = conf.workerGracefulShutdown
if (gracefulShutdown) {
if (decommissionShutdown) {
exitEventType = WorkerEventType.Decommission

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exitEventType is read across threads without synchronization. It's written here and in exit() under this.synchronized, but the shutdown-hook thread reads workerStatusManager.exitEventType (Worker.scala:1087) without the lock, and the field (declared line 38) isn't @volatile. A runtime REST exit(...) that updates it shortly before an independent SIGTERM-triggered hook read has no happens-before guarantee → the hook could take a stale branch (e.g. exitImmediately instead of decommissionWorker). Mark exitEventType @volatile.

logInfo("Decommission shutdown enabled, worker will decommission on SIGTERM" +
" (overrides graceful shutdown)")
} else if (gracefulShutdown) {
exitEventType = WorkerEventType.Graceful
}
Comment thread
chenghuichen marked this conversation as resolved.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, DiskFileInfo]]()
// ShuffleClient can fetch data from a restarted worker only
// when the worker's fetching port is stable.
val workerGracefulShutdown = conf.workerGracefulShutdown
val workerGracefulShutdown = conf.workerGracefulShutdown && !conf.workerDecommissionShutdown
if (workerGracefulShutdown) {
try {
val dbBackend = DBBackend.byName(conf.workerGracefulShutdownRecoverDbBackend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,24 +57,52 @@ class WorkerStatusManagerSuite extends AnyFunSuite {
statusManager.init(worker)

statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())
Assert.assertEquals(
worker.workerInfo.getWorkerStatus().getStateValue,
PbWorkerStatus.State.InDecommissionThenIdle.getNumber)
PbWorkerStatus.State.InDecommissionThenIdle.getNumber,
worker.workerInfo.getWorkerStatus().getStateValue)

// Rerun state Transition
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.InDecommissionThenIdle)
Assert.assertEquals(PbWorkerStatus.State.InDecommissionThenIdle, statusManager.getWorkerState())

// Reset shuffleKeys
shuffleKeys.clear()
statusManager.doTransition(WorkerEventType.DecommissionThenIdle)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Idle)
Assert.assertEquals(PbWorkerStatus.State.Idle, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())

statusManager.doTransition(WorkerEventType.Recommission)
Assert.assertEquals(statusManager.getWorkerState(), PbWorkerStatus.State.Normal)
Assert.assertEquals(PbWorkerStatus.State.Normal, statusManager.getWorkerState())
}

test("Test exitEventType initialization based on config") {
// Default: neither graceful nor decommission → Immediately
val conf1 = new CelebornConf()
Comment thread
chenghuichen marked this conversation as resolved.
val mgr1 = new WorkerStatusManager(conf1)
Assert.assertEquals(WorkerEventType.Immediately, mgr1.exitEventType)

// Graceful shutdown only → Graceful
val conf2 = new CelebornConf()
conf2.set("celeborn.worker.graceful.shutdown.enabled", "true")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-defensive test setup. conf1 above explicitly sets both keys with the comment "so the assertion does not depend on system properties leaked from other tests." But conf2 here sets only graceful.shutdown.enabled=true (not decommission.shutdown.enabled=false). Since new CelebornConf() loads system properties, a leaked celeborn.worker.decommission.shutdown.enabled=true would make exitEventType=Decommission and this assertEquals(Graceful, mgr2.exitEventType) fail. Apply conf1's both-keys defensiveness to conf2/conf3.

val mgr2 = new WorkerStatusManager(conf2)
Assert.assertEquals(WorkerEventType.Graceful, mgr2.exitEventType)

Comment thread
chenghuichen marked this conversation as resolved.
// Decommission shutdown only → Decommission
val conf3 = new CelebornConf()
conf3.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr3 = new WorkerStatusManager(conf3)
Assert.assertEquals(WorkerEventType.Decommission, mgr3.exitEventType)

// Both enabled → Decommission overrides graceful
val conf4 = new CelebornConf()
conf4.set("celeborn.worker.graceful.shutdown.enabled", "true")
conf4.set("celeborn.worker.decommission.shutdown.enabled", "true")
val mgr4 = new WorkerStatusManager(conf4)
Assert.assertEquals(WorkerEventType.Decommission, mgr4.exitEventType)
Comment thread
chenghuichen marked this conversation as resolved.
Assert.assertTrue(conf4.workerGracefulShutdown)
Assert.assertTrue(conf4.workerDecommissionShutdown)
}
}
Loading