[CELEBORN-194] Introduce client side metrics for celeborn#3740
[CELEBORN-194] Introduce client side metrics for celeborn#3740AmandeepSingh285 wants to merge 6 commits into
Conversation
|
Hi @SteNicholas , @RexXiong could you please help with a high level review on the implementation design for change adding client side metrics. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3740 +/- ##
============================================
+ Coverage 57.73% 58.77% +1.04%
- Complexity 214 319 +105
============================================
Files 397 399 +2
Lines 27880 28056 +176
Branches 2714 2729 +15
============================================
+ Hits 16095 16488 +393
+ Misses 10635 10384 -251
- Partials 1150 1184 +34 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Gentle ping @SteNicholas , @RexXiong could you please help with a high level review of the approach. Thanks! |
|
Gentle follow-up ping @SteNicholas @RexXiong . Would appreciate a high-level review of the proposed approach whenever you have some time. Thanks! |
There was a problem hiding this comment.
Pull request overview
Note
Copilot couldn't run its full agentic review because no GitHub Actions runner was available. Make sure your repository has a runner available to run Copilot's review, or add a copilot-setup-steps.yml file specifying one with the runs-on attribute. See the docs for more details.
Adds client-side metrics collection in Celeborn clients and ships those metrics to the master via application heartbeats, where they are re-exposed on the master Prometheus endpoint labeled by applicationId.
Changes:
- Extend
HeartbeatFromApplication(and protobuf serde) to carry aclientMetricsmap of{name -> (value, type)}. - Add client and master metric sources (
CelebornClientSource,ApplicationMetricsSource) plus wiring inLifecycleManager/Master. - Introduce
celeborn.client.metrics.enabledconfig and add/unit-test coverage for serde + source behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala | Adds unit tests for master-side application metrics source behavior. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala | Registers the new application metrics source and plumbs heartbeat clientMetrics through. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala | Implements master-side cache + Prometheus re-export of client metrics by applicationId. |
| docs/configuration/metrics.md | Documents new celeborn.client.metrics.enabled config. |
| common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala | Adds serde round-trip test for clientMetrics in heartbeats. |
| common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala | Extends heartbeat message, protobuf encoding/decoding for client metrics. |
| common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala | Adds Role.CLIENT label behavior and counterExists helper. |
| common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala | Introduces ClientMetric + MetricType shared representation. |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Adds celeborn.client.metrics.enabled config entry and accessor. |
| common/src/main/proto/TransportMessages.proto | Adds clientMetrics field + metric type/message definitions to heartbeat protobuf. |
| client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala | Adds test coverage for excluded-worker metrics behavior. |
| client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala | Adds unit tests for client metric source counters/gauges + snapshot types. |
| client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala | Increments client “shuffle data lost” metric on lost-file conditions when enabled. |
| client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | Creates client metrics source, registers gauges, increments counters, and supplies snapshots to heartbeats. |
| client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala | Increments revive-failure metrics when change partition assignment fails. |
| client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala | Implements client-side metrics source + snapshot export for heartbeat payload. |
| client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala | Adds callback to attach client metrics to each HeartbeatFromApplication. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
@AmandeepSingh285, thanks for working on client-side metrics — the overall shape (client AbstractSource snapshot → heartbeat → master re-expose) is reasonable and the serde/config/docs are wired up. Since it's marked [WIP], I'm leaving review comments rather than approving; there are a few correctness/lifecycle issues worth resolving first (inline). Summary, most-impactful first:
- Client metrics source + cleaner thread leak (per Spark driver).
clientSourceis created unconditionally and never destroyed — see inline onLifecycleManager.scala:226. - Master re-registers dead apps' metrics → permanent leak.
Masteris a plainRpcEndpoint(notThreadSafeRpcEndpoint), so its Inbox runs withenableConcurrent = trueand heartbeats are processed concurrently. A heartbeat racing/afterhandleAppLostresurrects the app's per-app gauges/counters, which are then never cleaned. See inline onApplicationMetricsSource.updateApplicationMetrics. - Non-atomic counter delta + fragile absolute→delta conversion. Concurrent heartbeats for one app double-count; app-restart-with-same-id or heartbeat reordering corrupt the delta. See inline on
updateCounter. - Unbounded master cardinality + silent truncation. Per-
applicationIdlabeling has no top-N cap and no master-side enable flag;AbstractSource.getMetricstruncates atmetricsCapacity(4096) and emits counters last, so app counters drop first. The codebase already solved this for worker per-app metrics viaceleborn.metrics.worker.app.topResourceConsumption.count(default 0/off). See inline onMaster.scala. - Gauge flaps to 0 on removal (cache cleared before the gauge is unregistered) — inline on
removeApplicationMetrics. - Metric semantics:
ClientReviveFailCountis incremented bychangePartitions.sizein one place but by +1 inhandleRevive, andClientShuffleDataLostCountis bumped in bothhandleMapPartitionEndandReducePartitionCommitHandler.stageEnd— mixed units / possible double-count. Inline onChangePartitionManager. fromPbsilently maps unknownPbMetricTypeto Gauge (forward-compat trap) — inline onControlMessages.scala.
Minor / cleanup (no inline needed): the if (clientMetricsEnabled) clientSource.incCounter(...) guard is copy-pasted ~12×, and ReducePartitionCommitHandler recomputes the gate inline instead of reusing the cached clientMetricsEnabled field — a single incClientMetric(name, n) helper would centralize the gate and avoid the semantic drift in (6). ClientMetric/MetricType also duplicate proto PbClientMetric/PbMetricType (4 spots to keep in lockstep). Test gap: the counter-delta path in ApplicationMetricsSource is untested (ApplicationMetricsSourceSuite only sends MetricType.Gauge).
|
Thanks @SteNicholas for the review. Still working on improving this PR. Will take into account all the updated you mentioned. Thanks! |
|
@AmandeepSingh285, please firstly resolve conflicts. |
| clientSource.foreach { source => | ||
| source.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => | ||
| registeredShuffle.size | ||
| } | ||
| source.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => | ||
| workerStatusTracker.excludedWorkers.size | ||
| } | ||
| source.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () => | ||
| workerStatusTracker.shuttingWorkers.size | ||
| } | ||
| source.start() | ||
| } |
| def start(): Unit = startCleaner() | ||
|
|
||
| def stop(): Unit = metricsCleaner.shutdown() |
| errors.get()) | ||
| } | ||
|
|
||
| test("recordWorkerFailure increments client worker-excluded counter and gauge") { |
36a9603 to
438f541
Compare
What changes were proposed in this pull request?
Adding client side metrics for Celeborn via heartbeat to master.
Why are the changes needed?
These changes help increase observability for Celeborn clients.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?