Skip to content

[CELEBORN-2370] Scope reducer metadata by partition range#3745

Open
sunchao wants to merge 2 commits into
apache:mainfrom
sunchao:CELEBORN-2370-scoped-reducer-metadata
Open

[CELEBORN-2370] Scope reducer metadata by partition range#3745
sunchao wants to merge 2 commits into
apache:mainfrom
sunchao:CELEBORN-2370-scoped-reducer-metadata

Conversation

@sunchao

@sunchao sunchao commented Jun 25, 2026

Copy link
Copy Markdown
Member

JIRA: CELEBORN-2370

Supersedes #3687.

Why are the changes needed?

Before a Spark reducer can read shuffle data, Celeborn calls GetReducerFileGroup to obtain reducer file locations and related metadata.

Today that response is shuffle-wide. For a shuffle with N reducers, it contains metadata for all N reducers, even though a Spark task normally reads only [startPartition, endPartition).

For example:

  • a shuffle has 1,000,000 reducers;
  • a task reads only [42, 43); but
  • the executor still downloads and materializes metadata for all 1,000,000 reducers.

The response is cached independently on every executor, so metadata transfer and executor memory grow with the total reducer count multiplied by the number of executors, rather than with the reducer ranges those executors actually read. On very large shuffles this can cause:

  • large driver RPC responses;
  • repeated transfer of the same shuffle-wide metadata;
  • executor heap and GC pressure; and
  • task threads waiting behind the same shuffle-wide metadata load.

The driver still needs to retain complete shuffle commit metadata. This PR reduces the metadata transferred to and cached by each executor.

What changes were proposed in this pull request?

Add optional partition-range fields to GetReducerFileGroup requests and responses.

When Spark reads a shuffle:

  1. The reader sends its actual [startPartition, endPartition) range.
  2. The driver returns only the reducer file groups, successful partition IDs, and failed-batch metadata that intersect that range.
  3. The executor records which ranges it has loaded and requests only missing ranges.

The executor cache also:

  • shares one in-flight RPC between callers requesting the same cold range;
  • lets unrelated cached ranges proceed independently;
  • fetches mapper-attempt metadata on the first request and omits it from later range requests;
  • prevents an in-flight request from repopulating state after shuffle cleanup;
  • preserves interruption and failure propagation; and
  • treats an unscoped response from an older driver as complete shuffle metadata.

Partition-scoped responses bypass the existing shuffle-wide RPC cache and Spark broadcast path. Legacy full-shuffle requests continue to use the existing cache and broadcast behavior.

This PR changes the Spark client and the Spark driver-side lifecycle/commit endpoint. It does not change the Celeborn Master or Worker.

Does this PR introduce any user-facing change?

No configuration or public API change is required.

When both the Spark client and driver contain this change, reducer metadata transfer and executor cache size become proportional to the partition ranges read by that executor instead of the total reducer count.

The wire protocol remains backward compatible:

  • New client with old driver: the old driver ignores the optional request fields and returns full metadata; the new client detects the unscoped response and caches it as complete.
  • Old client with new driver: unset range fields select the existing full-response cache and broadcast path.
  • Flink clients continue to use the legacy full-shuffle request.

Mixed-version deployments remain correct, but the optimization applies only when the driver supports scoped responses.

How was this patch tested?

  • ShuffleClientSuiteJ: 29 tests covering range caching, concurrent loads, cleanup races, interruption, and old-driver fallback
  • UtilsSuite: 28 tests, including V1 and V2 protocol round trips
  • ReducerFileGroupFilterSuite: 2 range-filtering tests
  • ConfigurationSuite: 8 tests
  • Spark 4.0 / Scala 2.13 focused integration run: CelebornHashSuite and CelebornSortSuite (4 tests)
  • Spark 4.0 / Scala 2.13 package compile
  • Flink 1.20 common client package compile
  • Spotless apply/check

@sunchao sunchao marked this pull request as ready for review June 25, 2026 04:36
@afterincomparableyum

Copy link
Copy Markdown
Contributor

I'll take a look at this soon. It's an interesting PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants