[CELEBORN-194][WIP] Adding client side metrics for celeborn#3740
[CELEBORN-194][WIP] Adding client side metrics for celeborn#3740AmandeepSingh285 wants to merge 5 commits into
Conversation
|
Hi @SteNicholas , @RexXiong could you please help with a high level review on the implementation design for change adding client side metrics. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3740 +/- ##
============================================
- Coverage 66.91% 58.55% -8.35%
- Complexity 0 311 +311
============================================
Files 358 397 +39
Lines 21986 27910 +5924
Branches 1946 2728 +782
============================================
+ Hits 14710 16341 +1631
- Misses 6262 10382 +4120
- Partials 1014 1187 +173 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Gentle ping @SteNicholas , @RexXiong could you please help with a high level review of the approach. Thanks! |
|
Gentle follow-up ping @SteNicholas @RexXiong . Would appreciate a high-level review of the proposed approach whenever you have some time. Thanks! |
There was a problem hiding this comment.
Pull request overview
Note
Copilot couldn't run its full agentic review because no GitHub Actions runner was available. Make sure your repository has a runner available to run Copilot's review, or add a copilot-setup-steps.yml file specifying one with the runs-on attribute. See the docs for more details.
Adds client-side metrics collection in Celeborn clients and ships those metrics to the master via application heartbeats, where they are re-exposed on the master Prometheus endpoint labeled by applicationId.
Changes:
- Extend
HeartbeatFromApplication(and protobuf serde) to carry aclientMetricsmap of{name -> (value, type)}. - Add client and master metric sources (
CelebornClientSource,ApplicationMetricsSource) plus wiring inLifecycleManager/Master. - Introduce
celeborn.client.metrics.enabledconfig and add/unit-test coverage for serde + source behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala | Adds unit tests for master-side application metrics source behavior. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala | Registers the new application metrics source and plumbs heartbeat clientMetrics through. |
| master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala | Implements master-side cache + Prometheus re-export of client metrics by applicationId. |
| docs/configuration/metrics.md | Documents new celeborn.client.metrics.enabled config. |
| common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala | Adds serde round-trip test for clientMetrics in heartbeats. |
| common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala | Extends heartbeat message, protobuf encoding/decoding for client metrics. |
| common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala | Adds Role.CLIENT label behavior and counterExists helper. |
| common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala | Introduces ClientMetric + MetricType shared representation. |
| common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala | Adds celeborn.client.metrics.enabled config entry and accessor. |
| common/src/main/proto/TransportMessages.proto | Adds clientMetrics field + metric type/message definitions to heartbeat protobuf. |
| client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala | Adds test coverage for excluded-worker metrics behavior. |
| client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala | Adds unit tests for client metric source counters/gauges + snapshot types. |
| client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala | Increments client “shuffle data lost” metric on lost-file conditions when enabled. |
| client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala | Creates client metrics source, registers gauges, increments counters, and supplies snapshots to heartbeats. |
| client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala | Increments revive-failure metrics when change partition assignment fails. |
| client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala | Implements client-side metrics source + snapshot export for heartbeat payload. |
| client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala | Adds callback to attach client metrics to each HeartbeatFromApplication. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| val clientSource = new CelebornClientSource(conf) | ||
| private[client] val clientMetricsEnabled = conf.metricsSystemEnable && conf.clientMetricsEnabled | ||
| val commitManager = new CommitManager(appUniqueId, conf, this) | ||
| val workerStatusTracker = new WorkerStatusTracker(conf, this) | ||
| if (clientMetricsEnabled) { | ||
| clientSource.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => | ||
| registeredShuffle.size | ||
| } | ||
| clientSource.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => | ||
| workerStatusTracker.excludedWorkers.size | ||
| } | ||
| clientSource.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () => | ||
| workerStatusTracker.shuttingWorkers.size | ||
| } | ||
| } |
| // start cleaner thread | ||
| startCleaner() |
| def updateApplicationMetrics(appId: String, metrics: JMap[String, ClientMetric]): Unit = { | ||
| if (metrics.isEmpty) return | ||
| metrics.asScala.foreach { case (name, metric) => | ||
| val labels = Map(applicationLabel -> appId) |
| private def updateCounter( | ||
| appId: String, | ||
| name: String, | ||
| labels: Map[String, String], | ||
| newValue: Long): Unit = { | ||
| val prev = appCounterPrev.computeIfAbsent(appId, _ => JavaUtils.newConcurrentHashMap()) | ||
| if (!counterExists(name, labels)) { | ||
| addCounter(name, labels) | ||
| } | ||
| val prevValue = prev.getOrDefault(name, 0L) | ||
| val delta = newValue - prevValue | ||
| if (delta > 0) { | ||
| incCounter(name, delta, labels) | ||
| } | ||
| prev.put(name, newValue) | ||
| } |
| errors.get()) | ||
| } | ||
|
|
||
| test("recordWorkerFailure increments client worker-excluded counter and gauge") { |
|
|
||
| class ApplicationMetricsSourceSuite extends CelebornFunSuite { | ||
|
|
||
| private def metricsOf(app: String, value: Long): JHashMap[String, ClientMetric] = { |
There was a problem hiding this comment.
@AmandeepSingh285, thanks for working on client-side metrics — the overall shape (client AbstractSource snapshot → heartbeat → master re-expose) is reasonable and the serde/config/docs are wired up. Since it's marked [WIP], I'm leaving review comments rather than approving; there are a few correctness/lifecycle issues worth resolving first (inline). Summary, most-impactful first:
- Client metrics source + cleaner thread leak (per Spark driver).
clientSourceis created unconditionally and never destroyed — see inline onLifecycleManager.scala:226. - Master re-registers dead apps' metrics → permanent leak.
Masteris a plainRpcEndpoint(notThreadSafeRpcEndpoint), so its Inbox runs withenableConcurrent = trueand heartbeats are processed concurrently. A heartbeat racing/afterhandleAppLostresurrects the app's per-app gauges/counters, which are then never cleaned. See inline onApplicationMetricsSource.updateApplicationMetrics. - Non-atomic counter delta + fragile absolute→delta conversion. Concurrent heartbeats for one app double-count; app-restart-with-same-id or heartbeat reordering corrupt the delta. See inline on
updateCounter. - Unbounded master cardinality + silent truncation. Per-
applicationIdlabeling has no top-N cap and no master-side enable flag;AbstractSource.getMetricstruncates atmetricsCapacity(4096) and emits counters last, so app counters drop first. The codebase already solved this for worker per-app metrics viaceleborn.metrics.worker.app.topResourceConsumption.count(default 0/off). See inline onMaster.scala. - Gauge flaps to 0 on removal (cache cleared before the gauge is unregistered) — inline on
removeApplicationMetrics. - Metric semantics:
ClientReviveFailCountis incremented bychangePartitions.sizein one place but by +1 inhandleRevive, andClientShuffleDataLostCountis bumped in bothhandleMapPartitionEndandReducePartitionCommitHandler.stageEnd— mixed units / possible double-count. Inline onChangePartitionManager. fromPbsilently maps unknownPbMetricTypeto Gauge (forward-compat trap) — inline onControlMessages.scala.
Minor / cleanup (no inline needed): the if (clientMetricsEnabled) clientSource.incCounter(...) guard is copy-pasted ~12×, and ReducePartitionCommitHandler recomputes the gate inline instead of reusing the cached clientMetricsEnabled field — a single incClientMetric(name, n) helper would centralize the gate and avoid the semantic drift in (6). ClientMetric/MetricType also duplicate proto PbClientMetric/PbMetricType (4 spots to keep in lockstep). Test gap: the counter-delta path in ApplicationMetricsSource is untested (ApplicationMetricsSourceSuite only sends MetricType.Gauge).
| } | ||
|
|
||
| private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false) | ||
| val clientSource = new CelebornClientSource(conf) |
There was a problem hiding this comment.
clientSource is created unconditionally (before the clientMetricsEnabled guard), and AbstractSource's constructor spawns a daemon worker-metrics-cleaner scheduled executor; CelebornClientSource also calls startCleaner() and registers 8 counters in its ctor. But LifecycleManager.stop() only calls heartbeater.stop() + super.stop() — it never calls clientSource.destroy(). So every LifecycleManager (one per Spark app driver) leaks a daemon thread + a MetricRegistry, even when celeborn.client.metrics.enabled=false (the default). On multi-tenant/long-lived drivers (Spark Connect, Kyuubi, notebooks) and in WorkerStatusTrackerSuite's new test (which only calls stop()), these accumulate. Suggest gating creation on clientMetricsEnabled and calling clientSource.destroy() in stop(). (The cleaner is also a no-op here — clearOldValues only scans namedTimers, and this source has none.)
|
|
||
| startCleaner() | ||
|
|
||
| def updateApplicationMetrics(appId: String, metrics: JMap[String, ClientMetric]): Unit = { |
There was a problem hiding this comment.
Master extends plain RpcEndpoint (not ThreadSafeRpcEndpoint), so its Inbox sets enableConcurrent = true and app heartbeats are dispatched concurrently. If a heartbeat for appId is in-flight/queued when handleAppLost → removeApplicationMetrics(appId) runs (e.g. a false timeout from a GC pause, or the last heartbeat racing ApplicationLost), this method then re-computeIfAbsents the caches and re-addGauge/addCounters the app's metrics. Since removeApplicationMetrics only ever fires once per app, those applicationId=<dead app> series leak permanently and the counter re-emits its full cumulative value (prev reset to 0 → delta = full value). Needs a liveness check against the live-app set, or removal coordinated so a later heartbeat can't resurrect.
| addCounter(name, labels) | ||
| } | ||
| val prevValue = prev.getOrDefault(name, 0L) | ||
| val delta = newValue - prevValue |
There was a problem hiding this comment.
Two issues here. (a) Non-atomic read-modify-write: getOrDefault → delta → incCounter → prev.put is not atomic, and with enableConcurrent=true two heartbeats for the same app over-count (both read the same prev) or lose updates. Use ConcurrentHashMap.merge/compute to make delta+store atomic. (b) Fragile absolute→delta: the client reports an absolute cumulative value; if (delta > 0) then prev.put(name, newValue) (unconditional, line 92) means a same-appId restart (counter resets) or a reordered/retried heartbeat rewinds prev and either stalls the counter or over-counts on the next tick. Consider exposing the client's absolute value directly as a gauge (Prometheus rate()/increase() already tolerate counter resets) — that removes appCounterPrev and this whole bug class.
|
|
||
| def removeApplicationMetrics(appId: String): Unit = { | ||
| val labels = Map(applicationLabel -> appId) | ||
| val gaugeCache = appGaugeCache.remove(appId) |
There was a problem hiding this comment.
removeApplicationMetrics drops the appGaugeCache entry (line 97) before unregistering the gauges (line 99). The registered gauge closure reads Option(appGaugeCache.get(appId))...getOrElse(0L), so a concurrent Prometheus scrape landing in that window reports 0 for the about-to-be-removed gauge — a transient flap-to-0. Unregister the gauges first, then drop the cache (or guard the closure).
|
|
||
| metricsSystem.registerSource(resourceConsumptionSource) | ||
| metricsSystem.registerSource(masterSource) | ||
| metricsSystem.registerSource(applicationMetricsSource) |
There was a problem hiding this comment.
ApplicationMetricsSource re-exposes ~11 metrics per heartbeating app labeled by applicationId, with no top-N cap and gated only by the client-side flag (no master-side enable). All share one metricsCapacity (default 4096); AbstractSource.getMetrics silently truncates beyond that (logWarning only) and emits counters last, so app counters (e.g. ClientShuffleDataLostCount) are dropped first at ~370 concurrent apps. The codebase already handles this exact cardinality problem for worker per-app metrics via celeborn.metrics.worker.app.topResourceConsumption.count (default 0 = off, top-N capped, documented as high-cardinality). Worth following that precedent (master-side enable + top-N), and note the source/cleaner are constructed even when master metricsSystemEnable=false.
There was a problem hiding this comment.
@SteNicholas made a change to the approach where users can pass the labels required with the metrics and they are used as tags instead of appid. This helps avoid cardinality constraints. Could you please help with a review for this approach. Thanks!
| } | ||
| if (lifecycleManager.clientMetricsEnabled) { | ||
| lifecycleManager.clientSource.incCounter( | ||
| CelebornClientSource.REVIVE_FAIL_COUNT, |
There was a problem hiding this comment.
REVIVE_FAIL_COUNT is incremented here by changePartitions.size (per-partition), but in LifecycleManager.handleRevive it's incremented by +1 per batch on the unregistered/stage-ended paths, while REVIVE_REQUEST_COUNT is incremented by partitionIds.size. Mixing per-partition and per-batch units in one series makes a fail-rate uninterpretable. Separately, SHUFFLE_DATA_LOST_COUNT is incremented both in LifecycleManager.handleMapPartitionEnd and in ReducePartitionCommitHandler.stageEnd — please confirm those are disjoint events and not double-counting the same lost shuffle. Pick one unit per metric.
| new util.HashMap[String, ClientMetric]( | ||
| pbHeartbeatFromApplication.getClientMetricsMap.asScala.map { case (name, pbMetric) => | ||
| val metricType = pbMetric.getType match { | ||
| case PbMetricType.COUNTER => MetricType.Counter |
There was a problem hiding this comment.
fromPb uses case PbMetricType.COUNTER => Counter; case _ => Gauge, silently coercing UNRECOGNIZED/any future enum value to Gauge. With version skew, a newer client's counter would be decoded on an older master as a gauge → routed to updateGauge (last-value) instead of updateCounter (delta), i.e. silently wrong semantics. At minimum logWarning on the default branch; better, handle GAUGE/UNRECOGNIZED explicitly.
|
Thanks @SteNicholas for the review. Still working on improving this PR. Will take into account all the updated you mentioned. Thanks! |
What changes were proposed in this pull request?
Adding client side metrics for Celeborn via heartbeat to master.
Why are the changes needed?
These changes help increase observability for Celeborn clients.
Does this PR resolve a correctness bug?
Does this PR introduce any user-facing change?
How was this patch tested?