[CELEBORN-2362] Fix Flaky CI/CD#3737
Conversation
141ae29 to
8197efa
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3737 +/- ##
============================================
- Coverage 66.91% 57.38% -9.53%
- Complexity 0 214 +214
============================================
Files 358 395 +37
Lines 21986 27822 +5836
Branches 1946 2712 +766
============================================
+ Hits 14710 15962 +1252
- Misses 6262 10706 +4444
- Partials 1014 1154 +140 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
9937f86 to
dfa6397
Compare
6855de2 to
35123c3
Compare
|
@afterincomparableyum, is it ready? This fix is helpful for CI running. |
3d7bb21 to
3d8b984
Compare
|
@SteNicholas it is ready now, this should resolve a lot of the CI/CD flakes. There may be 1 failing every now and then, but this is an improvement from before. |
There was a problem hiding this comment.
Pull request overview
This PR targets CI/CD flakiness across the build toolchain, coverage instrumentation, and integration test stability (Spark/Flink) by reducing resource contention, eliminating known race conditions, and making startup/cleanup more deterministic.
Changes:
- Harden build tooling and coverage: make Maven download deterministic + validated/retried; exclude
io/netty/**from JaCoCo instrumentation to avoid Netty JFR class instrumentation failures. - Stabilize mini-cluster + integration tests: avoid ephemeral-port collisions, improve worker startup retry behavior, and enforce Spark/ShuffleClient cleanup between suites.
- Improve client/runtime correctness: adjust heartbeat scheduling, guard revive logic, and make the process-wide ShuffleClient singleton app-aware.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala | Bounds random port selection below ephemeral range; improves worker startup retry/cleanup behavior. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/SparkTestBase.scala | Reduces worker count; adds SparkSession + ShuffleClient cleanup between suites. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/ShuffleFallbackSuite.scala | Reduces worker count used by the suite to lower contention. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/memory/MemorySparkTestBase.scala | Reduces worker count for memory-storage suites to reduce CPU/thread pressure. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | Adds per-test SparkSession + ShuffleClient reset to prevent cross-test interference. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/client/LifecycleManagerUnregisterShuffleSuite.scala | Shortens shuffle expired-check interval to fit within eventually() window reliably. |
| tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala | Waits for async local flushing to complete before asserting on-disk file sizes. |
| pom.xml | Excludes Netty classes from JaCoCo agent instrumentation to prevent runtime channel failures. |
| master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java | Reconfigures Ratis storage dirs on retries to avoid async lock-release flakiness. |
| client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala | Delays first heartbeat by one interval to avoid firing before full registration. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java | Guards revive path against null old locations to avoid NPEs. |
| client/src/main/java/org/apache/celeborn/client/ShuffleClient.java | Makes static ShuffleClient singleton app-aware (rebuild on app id change, clear on reset). |
| build/mvn | Makes Maven download deterministic (archive), validates tarball bodies, and retries downloads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I addressed CoPilot comments @SteNicholas |
|
I’ll revert the CoPilot suggestions that broke CI in a few hours and ping when ready again |
62529b0 to
4283c1c
Compare
|
Ping @SteNicholas PR is ready for review again. |
SteNicholas
left a comment
There was a problem hiding this comment.
Code review — 6 findings (2 bugs, 2 cleanup, 2 minor).
Top issues:
- Resource leak — the new
appUniqueId-mismatch branch inShuffleClient.get()replaces_instancewithout callingshutdown()on the old one, leaking transport connections, thread pools, and RPC references. - NPE risk — the outer guard calls
appUniqueId.equals(…)which will NPE if the parameter is null.
Also noted (not in diff, no inline comment):
MasterClusterFeature.selectRandomPort()(line 53) andRatisMasterStatusSystemSuiteJ(line 134) still useselectRandomInt(1024, 65535)— the ephemeral-port collision fix applied toMiniClusterFeaturewas not propagated to these files.
See inline comments for details.
|
@afterincomparableyum, could you please take a look at above comments? |
|
Ping @SteNicholas I've addressed comments. |
bd8a3e4 to
e82023b
Compare
There was a problem hiding this comment.
@afterincomparableyum, thanks for udpates. I left some comments for updates.
|
@afterincomparableyum, could you please take a look at above comments? |
Address several independent root causes of CI/CD flakiness:
- build/mvn: default to archive.apache.org instead of the closer.lua
mirror redirector, validate each download is a real tarball (closer.lua
intermittently returns an HTML mirror page with HTTP 200), and retry up
to 3 times.
- pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
down channels mid-write and flaking the Flink integration tests.
- MiniClusterFeature: cap randomly selected ports below the ephemeral
floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
that look free at selection time but fail to bind.
- RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
directory on every start attempt, since Ratis releases the directory
lock asynchronously and a retry would otherwise hit "directory is
already locked".
- spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
default) to cut the CPU/thread footprint of the serial single-JVM
suites that otherwise starves RPC/fetch handlers past the 240s timeout.
- LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
interval to 5s so the shuffle unregister runs within the eventually()
window with retry margin.
…fleclient, adjust mvn build script, and also remove duplicate code in disk suite.
…nistically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure
e82023b to
2f16112
Compare
2f16112 to
fb95ba1
Compare
|
@SteNicholas ready for review again. |
…igger fetch failure in SparkUtilsSuite
There was a problem hiding this comment.
@afterincomparableyum, thanks for updates. Solid direction and most of these are genuine flakiness fixes. A few things to look at before merge — the two ShuffleClient items and the MiniClusterFeature worker leak are the ones that can re-introduce the very flakiness this PR removes. Details inline.
Two non-line notes:
- The PR description mentions an
ApplicationHeartbeaterchange ("start the heartbeat timer after one interval instead of at delay 0"), but the current diff does not touchApplicationHeartbeater.scala— the initial delay is still0. Looks like it was dropped in a later revision; please update the description so it matches the code. - Cleanup nits (non-blocking):
selectRandomPort/portBoundedis now duplicated betweenMasterClusterFeatureandMiniClusterFeature, and the ephemeral-floor value is spelled three ways (MAX_SELECTABLE_PORT, a bare32768inMasterClusterFeature, and... - 2=32766in the Ratis suite) — worth a single sharedUtils.selectRandomPort()+ constant. The three near-identical construct blocks inget()could also collapse into one helper.
| UserIdentifier userIdentifier, | ||
| byte[] extension) { | ||
| if (null == _instance || !initialized) { | ||
| if (null == _instance || !initialized || !Objects.equals(appUniqueId, _appUniqueId)) { |
There was a problem hiding this comment.
get() is now app-aware, but the lock-free fast path still has a TOCTOU that can hand back the wrong app's client.
The in-lock return _instance (134) is safe, but the fast-path return _instance (137) reads the static volatile without the lock. The publish-order comment (127-129) only closes the new-id + stale-instance direction; writing _instance (130) before _appUniqueId (131) opens the reverse window:
- Thread A is in branch-3 for app A: it has run
_instance = A(130) but not yet_appUniqueId = A(131), so_appUniqueIdis stillB. - Concurrent thread B calls
get(B): the guard here sees_appUniqueId == B→Objects.equals(B,B)true → guard false → B skips the lock andreturn _instance(137) returns A's client.
B (the live app) then drives its shuffle against A's LifecycleManager → ArrayIndexOutOfBounds / CommitMetadata CRC mismatch, i.e. the cross-app corruption this PR is fixing. Only reachable in the multi-app spark-it JVM (prod uses one appUniqueId per JVM, so branch-3 never fires there) — but that is exactly the scenario branch-3 was added for.
Fix: return the instance captured under the lock (each branch assigns a local result, then return result;), or hold (appId, client) in a single AtomicReference so id and instance can't be read torn.
| // orphan is bounded (one per appUniqueId) and unreachable in normal single-app JVMs. | ||
| // The spark-it suite runs multiple apps in one reused JVM with overlapping lifecycles, so | ||
| // a shutdown() here tears down an instance still in use by the previous app and fails. | ||
| ShuffleClientImpl newInstance = new ShuffleClientImpl(appUniqueId, conf, userIdentifier); |
There was a problem hiding this comment.
branch-3 replaces _instance without shutdown()-ing the old one, so its RpcEnv (bound port + dispatcher threads), Netty dataClientFactory, pushDataRetryPool and reviveManager leak until JVM exit.
The comment calls the orphan "bounded (one per appUniqueId)", but under two apps alternately calling get(), every call observes a mismatch and rebuilds — so it's bounded by the number of A↔B switches, not by app count. reset() only nulls the reference (no shutdown()), so these are never reclaimed there either. In the long-lived serial spark-it JVM this accumulates threads/FDs/ports, feeding the same contention this PR lowers worker counts to fight. If teardown of an in-use instance is the concern, that's a signal the static singleton shouldn't be shared across apps at all (per-app map / injected client).
| Thread.currentThread().interrupt() | ||
| throw ie | ||
| } | ||
| worker = createWorker(workerConf) |
There was a problem hiding this comment.
Recreating the worker on retry leaks the failed worker into the returned cluster set.
workers(i-1) = worker (224) is written before initialize(), and the main poll loop does workerInfos.put(workers(i), …) (270) before the registered check (272), with no removal anywhere. On a failed first attempt the main thread can observe the failed worker (during its initialize() / backoff window) and put it into workerInfos; this line then creates a new object, which is also put on a later poll. Worker overrides neither equals nor hashCode (identity equality), so the dead and live workers are distinct keys and both survive in workerInfos.keySet, which setUpWorkers returns. A suite iterating the set (e.g. CelebornHashCheckDiskSuite workers.foreach { _.storageManager.updateDiskInfos() }) then operates on a stopped worker → NPE/RejectedExecutionException, and shutdownMiniCluster double-stops it. The old val worker (single identity, idempotent put) didn't have this.
Related: workers is written under flagUpdateLock (223-225) but read without it at 267 — a visibility race the new var reassignment makes easier to hit. And the InterruptedException branch (229) tears the worker down but does not recreate it, so an interrupt during initialize()/backoff leaves a stopped worker pinned in workers(i-1) that the poll loop then inserts and waits on until timeout. Suggest only inserting into workerInfos once the worker is registered (move the put after the check), and/or overwriting the prior failed entry for slot i.
| throw ex | ||
| } | ||
| try { | ||
| TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong) |
There was a problem hiding this comment.
Math.pow(2, workerStartRetry) seconds is OK today (maxRetries = 4 → 2+4+8 = 14s before the final throw), but 14s of blind backoff plus the per-attempt stop/shutdown already eats a large slice of the 60s workersWaitingTimeoutMs window — so a worker that retries a couple of times can itself trip the startup timeout this PR is trying to harden. It's also brittle: raising maxRetries makes this grow as 2^(retry-1) s (e.g. maxRetries = 10 → a 512s sleep). Consider clamping it, e.g. min(Math.pow(2, workerStartRetry).toLong, <few seconds>).
| if command -v curl >/dev/null 2>&1; then | ||
| echo "exec: curl ${curl_opts} ${remote_tarball}" 1>&2 | ||
| curl ${curl_opts} "${remote_tarball}" > "${local_tarball}" | ||
| elif command -v wget >/dev/null 2>&1; then |
There was a problem hiding this comment.
Three behavior changes in this rewrite worth a second look:
rm -f "${local_tarball}"at the top of every attempt drops the old "reuse an already-downloaded tarball" path. For an offline/air-gapped build that pre-stages the tarball (but not the extracted dir), this turns a working build into a hard failure. (The extracted-binary short-circuit at the top ofinstall_appstill works.)- curl
elifwget means when curl is present, wget is never tried across all 3 attempts. The old script fell through to wget when curl produced no file; a curl-specific failure (proxy/TLS/CA) that wget would survive now fails outright. - In
install_mvn, sinceAPACHE_MIRRORnow defaults toarchive.apache.org, the "fall back to archive" block (124) is dead code for the default config (the%/compare is equal → straight toexit 2). Minor:sleep 3(82) also runs after the final failed attempt before giving up.
| if (workers(i - 1) != null) { | ||
| workers(i - 1).shutdownGracefully() | ||
| } | ||
| Utils.tryLogNonFatalError(worker.exitImmediately()) |
There was a problem hiding this comment.
The retry cleanup now calls worker.exitImmediately(), which is heavier than the old shutdownGracefully(): it issues a blocking masterClient.askSync(WorkerLost(...)) (Worker.scala:1056) and adds the worker's host:ports to the master's excluded list.
Two effects on the start-retry path:
- The synchronous
WorkerLostRPC sits in the hot retry loop. If the master is slow/contended at startup (the exact scenario this PR targets), it blocks up to the ask timeout per failed attempt, compounding the2^retrybackoff against the 60sworkersWaitingTimeoutMs. - If a worker briefly registered before failing,
WorkerLostexcludes it on the master; the recreated worker re-registers under new random ports, so stale host:port exclusions accumulate on the master across retries.
A failed initialize() usually means the worker never came up, so exitImmediately() (which assumes a registered worker reporting itself lost) seems like the wrong teardown here — the InterruptedException branch above just does stop() + rpcEnv.shutdown(), which is probably what this branch wants too.
What changes were proposed in this pull request?
Address several independent root causes of CI/CD flakiness, spanning the
build tooling, JaCoCo instrumentation, the test mini-cluster, and the
process-wide shuffle client.
Build & coverage:
build/mvn: default to archive.apache.org instead of the closer.lua
mirror redirector, validate each download is a real tarball (closer.lua
intermittently returns an HTML mirror page with HTTP 200), and retry up
to 3 times.
pom.xml: exclude io/netty/** from JaCoCo instrumentation. JaCoCo's
agent rejects Netty 4.2's already-enhanced JFR event classes, tearing
down channels mid-write and flaking the Flink integration tests.
Client correctness:
ShuffleClient: make the process-wide singleton app-aware. Track the
appUniqueId of the live instance and rebuild it when a later app/suite
requests a different id (and clear it in reset()), so a straggler from a
previous app can no longer bind to the wrong LifecycleManager through the
shared static client.
ShuffleClientImpl: guard the revive path against a null oldLoc before
removing it from pushExcludedWorkers, avoiding an NPE when no previous
location is mapped.
ApplicationHeartbeater: start the heartbeat timer after one interval
instead of at delay 0, so the first heartbeat does not fire before the
application is fully registered.
Mini-cluster & test harness:
MiniClusterFeature: cap randomly selected ports below the ephemeral
floor (32768) to avoid a TOCTOU race with OS-assigned/TIME_WAIT ports
that look free at selection time but fail to bind. Also rework worker
startup: recreate the worker on each retry, handle InterruptedException
explicitly (stop + interrupt + rethrow), and back off exponentially
between attempts so a transient port collision retries cleanly.
RatisMasterStatusSystemSuiteJ: re-point each server to a fresh storage
directory on every start attempt, since Ratis releases the directory
lock asynchronously and a retry would otherwise hit "directory is
already locked".
Spark/Flink integration suites:
SparkTestBase / CelebornHashCheckDiskSuite: stop any active
SparkSession and reset the static ShuffleClient between suites/tests, so
a leaked SparkContext can no longer keep an old LifecycleManager alive
and corrupt a later suite's shuffle 0.
spark-it suites: reduce worker count from 5 to 3 (the MiniCluster
default) to cut the CPU/thread footprint of the serial single-JVM
suites that otherwise starves RPC/fetch handlers past the 240s timeout.
LifecycleManagerUnregisterShuffleSuite: shorten the expired-check
interval to 5s so the shuffle unregister runs within the eventually()
window with retry margin.
HybridShuffleWordCountTest: wait (eventually) for the async LocalFlusher
to drain before asserting on-disk file length equals the logical
length, instead of reading the file mid-flush.
Also, I fix flaky JVMQuakeSuite hang by driving the GC deficit bucket deterministically via an extracted JVMQuake.checkAndDump instead of inducing real GC pressure
Why are the changes needed?
CI/CD is always failing. With this, CI/CD rarely fails.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?
CI/CD