Skip to content

[CELEBORN-2371] Bound Spark batch-open client creation retries and stop them on interruption#3746

Open
sunchao wants to merge 1 commit into
apache:mainfrom
sunchao:dev/chao/codex/bound-batch-open-client-retries-oss
Open

[CELEBORN-2371] Bound Spark batch-open client creation retries and stop them on interruption#3746
sunchao wants to merge 1 commit into
apache:mainfrom
sunchao:dev/chao/codex/bound-batch-open-client-retries-oss

Conversation

@sunchao

@sunchao sunchao commented Jun 25, 2026

Copy link
Copy Markdown
Member

Why are the changes needed?

CELEBORN-2371 follows up on the parallel Spark batch-open client creation added by #3692.

The reader now creates clients for different workers in parallel, but each worker task still walks every PartitionLocation until one createClient call succeeds. A createClient call already has TransportClientFactory's complete retry budget, so the outer location loop can multiply that budget.

For example, suppose one reducer has 100 partition locations on worker-a and that worker is unavailable. The task for worker-a can make 100 outer createClient calls, and every call can run the configured transport retries. Because the reader waits for every worker task, the unavailable worker can keep the whole batch-open setup pending long after healthy workers have finished.

Cancellation also needs to terminate client creation consistently. If cancellation interrupts a failure callback, that callback can restore the interrupt flag and return; without checking the flag, the worker task starts the next location attempt. At the transport layer, synchronous bootstrap can wrap an InterruptedException in another exception, so the retry loop currently treats it as retryable. An interrupt while waiting for TCP connect or TLS setup also leaves cleanup of the in-progress channel implicit.

What changes were proposed in this PR?

Bound the per-worker outer retry loop

Each worker task now tries at most two partition locations. The first attempt keeps the normal path, and the second preserves same-worker fallback. Each attempt still receives the existing internal TransportClientFactory retry budget; this change only prevents that complete budget from being repeated once per partition location.

Stop the Spark worker task after cancellation

Before starting another same-worker attempt, CelebornShuffleReader checks the thread's interrupt flag and exits with InterruptedException. This complements the existing future cancellation path and prevents a callback that observed cancellation from falling through to another client creation.

Preserve interruption through transport bootstrap

TransportClientFactory now searches an exception's cause chain for InterruptedException, restores the thread's interrupt flag, and stops retrying immediately. The TCP-connect and TLS-handshake waits also share an interrupt-aware helper that closes the in-progress channel before propagating the interruption.

The tests cover the two-attempt bound, cancellation during the failure callback, a wrapped InterruptedException, and channel cleanup during an interrupted wait.

How was this PR tested?

Formatting was applied with:

./build/mvn --no-transfer-progress -DskipTests spotless:apply

The focused transport tests passed (11 tests):

./build/mvn --no-transfer-progress -pl common -am \
  -Dtest=TransportClientFactorySuiteJ,TransportClientFactoryInterruptSuiteJ \
  -DwildcardSuites=none clean test

The Spark 3.5 reader suite passed (7 tests):

./build/mvn --no-transfer-progress -Pspark-3.5 -pl client-spark/spark-3 -am \
  -Dtest=none \
  -DwildcardSuites=org.apache.spark.shuffle.celeborn.CelebornShuffleReaderSuite \
  clean test

Spark 4.0 / Scala 2.13 production and test compilation also passed:

./build/mvn --no-transfer-progress -Pspark-4.0 -pl client-spark/spark-3 -am \
  -DskipTests clean test

@sunchao sunchao marked this pull request as ready for review June 25, 2026 15:51
@SteNicholas SteNicholas requested a review from Copilot June 28, 2026 06:59

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

var streamCreatorPool: ThreadPoolExecutor = null

// TransportClientFactory already retries each attempt; allow one extra pooled-client attempt.
private val MAX_CLIENT_CREATION_ATTEMPTS_PER_HOST = 2

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this max attempt be configurable?

@SteNicholas SteNicholas requested a review from Copilot June 28, 2026 07:01

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@SteNicholas SteNicholas left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao, thanks — the direction is right and the tests are focused. The bound on the outer loop, propagating interruption through bootstrap, and closing the in-progress channel on interrupt are all good improvements. A few points worth addressing before merge (none are hard blockers, but the first two are worth a response):

  1. The second per-host attempt is redundant. All locations grouped under a hostPort share the same host:fetchPort (grouping keys by hostAndFetchPort, and the createClient lambda uses only getHost/getFetchPort), so attempt #2 re-targets the same worker. See the inline note. MAX=1 would give the same result; if you keep 2, the "preserves same-worker fallback" comment is misleading.

  2. Interrupt handling is now inconsistent across createClient callers. findInterruptedException makes retryCreateClient set the interrupt flag and throw a bare InterruptedException on a wrapped interrupt, but only createClientsInParallel has matching case ex: InterruptedException handling. The other callers — CelebornInputStream.createReaderWithRetry (catches Exception, then excludeFailedFetchLocation + Uninterruptibles.sleepUninterruptibly which re-asserts the flag), the non-parallel makeOpenStreamList, and the push/replicate paths — swallow it generically, so a cancellation gets recorded as a fetch/push failure (polluting shared cross-task exclusion state) and, because Netty's await() throws immediately when the flag is preset, cascades into fast-failing the remaining retries. Much of this cascade is pre-existing for direct interrupts; this PR widens it to wrapped ones. Worth either handling InterruptedException consistently at these call sites, or noting the trade-off.

  3. Minor — worker-thread interrupt surfaces as ExecutionException. When a worker run() throws InterruptedException (the new line-620 check, or a wrapped interrupt on the worker thread rethrown at line 628), futures.foreach(_.get()) raises ExecutionException, which is not matched by the case ex: InterruptedException at ~642 — so it escapes without restoring the caller's interrupt status. Low impact, but the new code makes "InterruptedException out of run()" more reachable.

  4. Nit — reuse. findInterruptedException duplicates the cause-chain walk in MasterClient.findMasterNotLeaderException; Guava Throwables.getCausalChain() (already imported here) or commons-lang3 ExceptionUtils.throwableOfType / a shared ExceptionUtils helper would avoid a third hand-rolled copy (and could carry a cycle guard).

var streamCreatorPool: ThreadPoolExecutor = null

// TransportClientFactory already retries each attempt; allow one extra pooled-client attempt.
private val MAX_CLIENT_CREATION_ATTEMPTS_PER_HOST = 2

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All locations grouped under one hostPort share the same host:fetchPort (groupOpenStreamLocations keys by hostAndFetchPort, and the createClient lambda at ~304 uses only getHost/getFetchPort). So the "second" attempt re-targets the same worker and re-runs TransportClientFactory's full maxIORetries connect budget against an already-dead host:port — there is no real same-worker fallback within a group (the replica lives on a different host = a separate group/task). MAX_CLIENT_CREATION_ATTEMPTS_PER_HOST = 1 would yield the same set of clients with half the connect-retry latency per dead worker; if you intentionally keep 2, the comment about "same-worker fallback" is misleading.

return createClient(remoteHost, remotePort, partitionId, supplier.get());
} catch (Exception e) {
if (e instanceof InterruptedException) {
InterruptedException interruptedException = findInterruptedException(e);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unwrapping a wrapped InterruptedException to stop retries is reasonable, but note this also calls Thread.currentThread().interrupt() and throws a bare InterruptedException from shared infra used by ~6 createClient callers, while only the parallel reader (createClientsInParallel) added a matching case ex: InterruptedException. The others — CelebornInputStream.createReaderWithRetry (catch (Exception)excludeFailedFetchLocation + Uninterruptibles.sleepUninterruptibly, which re-asserts the flag), the non-parallel makeOpenStreamList, and the ShuffleClientImpl/PushDataHandler push paths — swallow it generically. Because Netty await() throws immediately when the interrupt flag is preset, the leftover flag makes each subsequent createClient fast-fail in a cascade, and the cancellation is recorded as a worker failure (shared, cross-task exclusion state on replicated clusters). Consider handling InterruptedException consistently across these call sites, or documenting the trade-off.

},
cf);
assert cf.isDone();
if (cf.isCancelled()) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor / pre-existing but now inconsistent with the new cleanup paths: in this connectTimeoutMs <= 0 branch, the isCancelled() (339-341) and !isSuccess() (340-342) failures throw without closeChannel(cf), leaking the half-open channel — whereas the connectTimeoutMs > 0 branch below closes on both timeout and cause != null, and the new interrupt path closes too. Worth closing the channel here as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants