From 47528d2b2dfe197bb264df2e1da479639bd0b282 Mon Sep 17 00:00:00 2001 From: afterincomparableyum <224495379+afterincomparableyum@users.noreply.github.com> Date: Wed, 29 Apr 2026 22:30:16 -0700 Subject: [PATCH 1/3] [CELEBORN-2317] Validate applicationId to prevent worker path traversal The worker builds local shuffle paths by concatenating applicationId received over RPC: `///`. The int32 and fileName is built from int id/epoch/mode, but it was never validated against a charset. With auth disabled, any client on the network could supply `appId = "../foo"` and have the worker mkdir, create, or delete files outside its working dir. With auth enabled, the SASL clientId == applicationId equality check in RpcEndpoint did not constrain format, so a tenant whose registered id contained `..` could still escape. This change: - Adds Utils.validateAppId enforcing `^[A-Za-z0-9_-]+$`, which matches Spark (`application__`, `local-`), Flink, and MR formats. - Calls it at every worker RPC entry point that takes an applicationId or shuffleKey from the wire: Controller (ReserveSlots, CommitFiles, DestroyWorkerSlots), PushDataHandler.handleCore, and the two checkAuth sites in FetchHandler. - Adds a canonical path containment check in StorageManager.createDiskFile (local disk branch) as defense in depth, before mkdirs() runs. --- .../apache/celeborn/common/util/Utils.scala | 11 +++++++++ .../celeborn/common/util/UtilsSuite.scala | 24 +++++++++++++++++++ .../service/deploy/worker/Controller.scala | 6 ++++- .../service/deploy/worker/FetchHandler.scala | 8 +++++-- .../deploy/worker/PushDataHandler.scala | 4 +++- .../worker/storage/StorageManager.scala | 9 ++++++- 6 files changed, 57 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index 5b2dd6a1097..d8e7f489327 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -700,6 +700,17 @@ object Utils extends Logging { (appId, shuffleId) } + private val appIdPattern = "^[A-Za-z0-9_-]+$".r + + def validateAppId(applicationId: String): Unit = { + if (applicationId == null || applicationId.isEmpty || + appIdPattern.findFirstIn(applicationId).isEmpty) { + throw new IllegalArgumentException( + s"Invalid application id: '$applicationId'. " + + "Application id must be non-empty and match [A-Za-z0-9_-]+.") + } + } + def splitPartitionLocationUniqueId(uniqueId: String): (Int, Int) = { val splits = uniqueId.split("-") val partitionId = splits.dropRight(1).mkString("-").toInt diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index 8be472b6447..4c160810dde 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -289,4 +289,28 @@ class UtilsSuite extends CelebornFunSuite { celebornConf) assert(testInstance.isInstanceOf[DefaultIdentityProvider]) } + + test("validateAppId rejects path traversal and accepts valid ids") { + Seq( + "application_1234567890123_0001", + "local-1234567890123", + "app1", + "my-app-id", + "app_with_underscores").foreach { id => + Utils.validateAppId(id) + } + + Seq( + "../etc/passwd", + "app/../secret", + "app/id", + "app\\id", + "app id", + "", + null).foreach { id => + intercept[IllegalArgumentException] { + Utils.validateAppId(id) + } + } + } } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 565acb44182..0e3df661ba1 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -115,6 +115,7 @@ private[deploy] class Controller( pushDataTimeout, partitionSplitEnabled, isSegmentGranularityVisible) => + Utils.validateAppId(applicationId) checkAuth(context, applicationId) val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) workerSource.sample(WorkerSource.RESERVE_SLOTS_TIME, shuffleKey) { @@ -146,6 +147,7 @@ private[deploy] class Controller( mapAttempts, epoch, mockFailure) => + Utils.validateAppId(applicationId) checkAuth(context, applicationId) val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) logDebug(s"Received CommitFiles request, $shuffleKey, primary files" + @@ -164,7 +166,9 @@ private[deploy] class Controller( s"$commitFilesTimeMs ms.") case DestroyWorkerSlots(shuffleKey, primaryLocations, replicaLocations, mockFailure) => - checkAuth(context, Utils.splitShuffleKey(shuffleKey)._1) + val applicationId = Utils.splitShuffleKey(shuffleKey)._1 + Utils.validateAppId(applicationId) + checkAuth(context, applicationId) handleDestroy(context, shuffleKey, primaryLocations, replicaLocations, mockFailure) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 7ad990e2bf4..7d552c92c2e 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -148,7 +148,9 @@ class FetchHandler( val endIndices = openStreamList.getEndIndexList val readLocalFlags = openStreamList.getReadLocalShuffleList val pbOpenStreamListResponse = PbOpenStreamListResponse.newBuilder() - checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) + val applicationId = Utils.splitShuffleKey(shuffleKey)._1 + Utils.validateAppId(applicationId) + checkAuth(client, applicationId) val openStreamRequestId = Utils.makeOpenStreamRequestId( shuffleKey, client.getChannel.id().toString, @@ -364,7 +366,9 @@ class FetchHandler( isLegacy: Boolean, readLocalShuffle: Boolean = false, callback: RpcResponseCallback): Unit = { - checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) + val applicationId = Utils.splitShuffleKey(shuffleKey)._1 + Utils.validateAppId(applicationId) + checkAuth(client, applicationId) workerSource.recordAppActiveConnection(client, shuffleKey) val requestId = Utils.makeOpenStreamRequestId( shuffleKey, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index 0accb33adbf..f89bda36e19 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -989,7 +989,9 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler shuffleKey: String, handler: () => Unit, callback: RpcResponseCallback): Unit = { - checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) + val applicationId = Utils.splitShuffleKey(shuffleKey)._1 + Utils.validateAppId(applicationId) + checkAuth(client, applicationId) try { handler() } catch { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 9a2d4a8a740..a9296a2ec20 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -1216,8 +1216,15 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs val dir = dirs(getNextIndex % dirs.size) val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints) val shuffleDir = new File(dir, s"$appId/$shuffleId") - shuffleDir.mkdirs() val file = new File(shuffleDir, fileName) + // Defense in depth: ensure the resolved path stays under the working dir + // even if appId / shuffleId / fileName contained traversal characters. + val dirCanonical = dir.getCanonicalPath + File.separator + if (!file.getCanonicalPath.startsWith(dirCanonical)) { + throw new IOException( + s"Refusing to create shuffle file outside working dir: ${file.getCanonicalPath}") + } + shuffleDir.mkdirs() try { if (file.exists()) { throw new FileAlreadyExistsException( From 7ef4f3768fe59e6654d67706db8d41c1876ecdc9 Mon Sep 17 00:00:00 2001 From: afterincomparableyum <224495379+afterincomparableyum@users.noreply.github.com> Date: Sun, 17 May 2026 23:24:03 -0700 Subject: [PATCH 2/3] address comments of claude from RexXiong --- .../celeborn/service/deploy/worker/FetchHandler.scala | 8 ++------ .../celeborn/service/deploy/worker/PushDataHandler.scala | 4 +--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 7d552c92c2e..7ad990e2bf4 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -148,9 +148,7 @@ class FetchHandler( val endIndices = openStreamList.getEndIndexList val readLocalFlags = openStreamList.getReadLocalShuffleList val pbOpenStreamListResponse = PbOpenStreamListResponse.newBuilder() - val applicationId = Utils.splitShuffleKey(shuffleKey)._1 - Utils.validateAppId(applicationId) - checkAuth(client, applicationId) + checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) val openStreamRequestId = Utils.makeOpenStreamRequestId( shuffleKey, client.getChannel.id().toString, @@ -366,9 +364,7 @@ class FetchHandler( isLegacy: Boolean, readLocalShuffle: Boolean = false, callback: RpcResponseCallback): Unit = { - val applicationId = Utils.splitShuffleKey(shuffleKey)._1 - Utils.validateAppId(applicationId) - checkAuth(client, applicationId) + checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) workerSource.recordAppActiveConnection(client, shuffleKey) val requestId = Utils.makeOpenStreamRequestId( shuffleKey, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala index f89bda36e19..0accb33adbf 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala @@ -989,9 +989,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler shuffleKey: String, handler: () => Unit, callback: RpcResponseCallback): Unit = { - val applicationId = Utils.splitShuffleKey(shuffleKey)._1 - Utils.validateAppId(applicationId) - checkAuth(client, applicationId) + checkAuth(client, Utils.splitShuffleKey(shuffleKey)._1) try { handler() } catch { From 1924c26c42ade2f435b9529a14df9976e15b8861 Mon Sep 17 00:00:00 2001 From: afterincomparableyum <224495379+afterincomparableyum@users.noreply.github.com> Date: Sat, 13 Jun 2026 12:13:03 -0500 Subject: [PATCH 3/3] Address comments Single auth checkpoint for every current and future RPC. Dropped the redundant calls in Controller. Add comment for remote storages. Harden validateAppId --- .../scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala | 6 ++++++ .../main/scala/org/apache/celeborn/common/util/Utils.scala | 7 ++++--- .../scala/org/apache/celeborn/common/util/UtilsSuite.scala | 2 ++ .../apache/celeborn/service/deploy/worker/Controller.scala | 6 +----- .../service/deploy/worker/storage/StorageManager.scala | 3 +++ 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala index 8d12c227af6..3b219372113 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEndpoint.scala @@ -20,6 +20,7 @@ package org.apache.celeborn.common.rpc import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.network.client.TransportClient import org.apache.celeborn.common.rpc.netty.RemoteNettyRpcCallContext +import org.apache.celeborn.common.util.Utils /** * A factory class to create the [[RpcEnv]]. It must have an empty constructor so that it can be @@ -138,6 +139,11 @@ trait RpcEndpoint { } def checkAuth(context: RpcCallContext, appId: String): Unit = { + // Validate the application id at the single auth chokepoint so every current + // and future RPC handler that calls checkAuth is covered, and so it runs even + // when auth is disabled (clientId == null). This guards the worker against + // path traversal via appId (e.g. "../foo") before any filesystem path is built. + Utils.validateAppId(appId) context match { case remoteContext: RemoteNettyRpcCallContext => checkAuth(remoteContext.transportClient, appId) diff --git a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala index d8e7f489327..2c4f3e310e4 100644 --- a/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala +++ b/common/src/main/scala/org/apache/celeborn/common/util/Utils.scala @@ -700,11 +700,12 @@ object Utils extends Logging { (appId, shuffleId) } - private val appIdPattern = "^[A-Za-z0-9_-]+$".r + private val appIdPattern = "[A-Za-z0-9_-]+".r.pattern def validateAppId(applicationId: String): Unit = { - if (applicationId == null || applicationId.isEmpty || - appIdPattern.findFirstIn(applicationId).isEmpty) { + // matches() anchors the whole input, so a trailing newline (which `$` would + // otherwise tolerate) is rejected along with any other traversal character. + if (applicationId == null || !appIdPattern.matcher(applicationId).matches()) { throw new IllegalArgumentException( s"Invalid application id: '$applicationId'. " + "Application id must be non-empty and match [A-Za-z0-9_-]+.") diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index 4c160810dde..e59296fc6b9 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -306,6 +306,8 @@ class UtilsSuite extends CelebornFunSuite { "app/id", "app\\id", "app id", + "app\n", + "valid_app\n", "", null).foreach { id => intercept[IllegalArgumentException] { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala index 0e3df661ba1..565acb44182 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala @@ -115,7 +115,6 @@ private[deploy] class Controller( pushDataTimeout, partitionSplitEnabled, isSegmentGranularityVisible) => - Utils.validateAppId(applicationId) checkAuth(context, applicationId) val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) workerSource.sample(WorkerSource.RESERVE_SLOTS_TIME, shuffleKey) { @@ -147,7 +146,6 @@ private[deploy] class Controller( mapAttempts, epoch, mockFailure) => - Utils.validateAppId(applicationId) checkAuth(context, applicationId) val shuffleKey = Utils.makeShuffleKey(applicationId, shuffleId) logDebug(s"Received CommitFiles request, $shuffleKey, primary files" + @@ -166,9 +164,7 @@ private[deploy] class Controller( s"$commitFilesTimeMs ms.") case DestroyWorkerSlots(shuffleKey, primaryLocations, replicaLocations, mockFailure) => - val applicationId = Utils.splitShuffleKey(shuffleKey)._1 - Utils.validateAppId(applicationId) - checkAuth(context, applicationId) + checkAuth(context, Utils.splitShuffleKey(shuffleKey)._1) handleDestroy(context, shuffleKey, primaryLocations, replicaLocations, mockFailure) } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index a9296a2ec20..a8341659510 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -1161,6 +1161,9 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint") } + // NOTE: the DFS branches below (HDFS/S3/OSS) also build "$appId/$shuffleId" + // paths but rely solely on the upstream Utils.validateAppId guard at the RPC + // entry points if (storageType == Type.HDFS && location.getStorageInfo.HDFSAvailable()) { val shuffleDir = new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")