From 905411fb757c1c5e076bc0e97149de8d5f1a3fbd Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Thu, 2 Jul 2026 08:50:44 +0000 Subject: [PATCH 1/6] Rebased with main --- .../client/ApplicationHeartbeater.scala | 8 +- .../client/CelebornClientSource.scala | 72 ++++++++++++ .../client/ChangePartitionManager.scala | 6 + .../celeborn/client/LifecycleManager.scala | 53 ++++++++- .../commit/ReducePartitionCommitHandler.scala | 5 +- .../client/CelebornClientSourceSuite.scala | 91 +++++++++++++++ .../client/WorkerStatusTrackerSuite.scala | 24 ++++ common/src/main/proto/TransportMessages.proto | 11 ++ .../apache/celeborn/common/CelebornConf.scala | 10 ++ .../common/metrics/ClientMetric.scala | 26 +++++ .../metrics/source/AbstractSource.scala | 6 + .../protocol/message/ControlMessages.scala | 25 ++++- .../celeborn/common/util/UtilsSuite.scala | 26 ++++- .../master/ApplicationMetricsSource.scala | 106 ++++++++++++++++++ .../service/deploy/master/Master.scala | 15 ++- .../ApplicationMetricsSourceSuite.scala | 64 +++++++++++ 16 files changed, 536 insertions(+), 12 deletions(-) create mode 100644 client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala create mode 100644 client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala create mode 100644 common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala create mode 100644 master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala create mode 100644 master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index e91e236b6c4..d2423e5ac6e 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.MasterClient import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.metrics.ClientMetric import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, CheckQuotaResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID} import org.apache.celeborn.common.protocol.message.StatusCode @@ -42,7 +43,9 @@ class ApplicationHeartbeater( (Long, Long, Map[String, java.lang.Long], Map[String, java.lang.Long])), workerStatusTracker: WorkerStatusTracker, registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean], - cancelAllActiveStages: String => Unit) extends Logging { + cancelAllActiveStages: String => Unit, + clientMetrics: () => util.Map[String, ClientMetric] = + () => new util.HashMap[String, ClientMetric]()) extends Logging { private var stopped = false private val reviseLostShuffles = conf.reviseLostShufflesEnabled @@ -85,7 +88,8 @@ class ApplicationHeartbeater( tmpApplicationFallbackCounts.asJava, workerStatusTracker.getNeedCheckedWorkers().toList.asJava, ZERO_UUID, - true) + true, + clientMetrics()) val response = requestHeartbeat(appHeartbeat) if (response.statusCode == StatusCode.SUCCESS) { logDebug("Successfully send app heartbeat.") diff --git a/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala b/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala new file mode 100644 index 00000000000..9da668d3dc4 --- /dev/null +++ b/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.client + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} +import org.apache.celeborn.common.metrics.source.{AbstractSource, Role} + +/** + * Metrics source for the Celeborn client + */ +class CelebornClientSource(conf: CelebornConf) extends AbstractSource(conf, Role.CLIENT) { + override val sourceName = "client" + + import CelebornClientSource._ + + addCounter(REGISTER_SHUFFLE_COUNT) + addCounter(REGISTER_SHUFFLE_FAIL_COUNT) + addCounter(UNREGISTER_SHUFFLE_COUNT) + addCounter(REVIVE_REQUEST_COUNT) + addCounter(REVIVE_FAIL_COUNT) + addCounter(SLOT_RESERVATION_FAIL_COUNT) + addCounter(SHUFFLE_FETCH_FAILURE_COUNT) + addCounter(SHUFFLE_DATA_LOST_COUNT) + + def getMetricsSnapshot(): Map[String, ClientMetric] = { + val counterMetrics = counters().map(c => + c.name -> ClientMetric(c.counter.getCount, MetricType.Counter)) + val gaugeMetrics = gauges().map(g => + g.name -> ClientMetric(g.gauge.getValue.asInstanceOf[Number].longValue(), MetricType.Gauge)) + (counterMetrics ++ gaugeMetrics).toMap + } + + // start cleaner thread + startCleaner() +} + +object CelebornClientSource { + // worker health + val EXCLUDED_WORKER_COUNT = "ClientExcludedWorkerCount" + val SHUTTING_WORKER_COUNT = "ClientShuttingWorkerCount" + + // shuffle lifecycle + val ACTIVE_SHUFFLE_COUNT = "ClientActiveShuffleCount" + val REGISTER_SHUFFLE_COUNT = "ClientRegisterShuffleCount" + val REGISTER_SHUFFLE_FAIL_COUNT = "ClientRegisterShuffleFailCount" + val UNREGISTER_SHUFFLE_COUNT = "ClientUnregisterShuffleCount" + + // write path + val REVIVE_REQUEST_COUNT = "ClientReviveRequestCount" + val REVIVE_FAIL_COUNT = "ClientReviveFailCount" + val SLOT_RESERVATION_FAIL_COUNT = "ClientSlotReservationFailCount" + + // data integrity + val SHUFFLE_FETCH_FAILURE_COUNT = "ClientShuffleFetchFailureCount" + val SHUFFLE_DATA_LOST_COUNT = "ClientShuffleDataLostCount" +} diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index 71096a952fb..ae8f28a802f 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -23,6 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, Schedu import scala.collection.JavaConverters._ +import org.apache.celeborn.client.CelebornClientSource import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging @@ -282,6 +283,11 @@ class ChangePartitionManager( None, lifecycleManager.workerStatusTracker.workerAvailableByLocation(req.oldPartition)))) } + if (lifecycleManager.clientMetricsEnabled) { + lifecycleManager.clientSource.incCounter( + CelebornClientSource.REVIVE_FAIL_COUNT, + changePartitions.size) + } } val candidates = new util.HashSet[WorkerInfo]() diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index f9508cfe6c8..f718550716a 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -46,6 +46,7 @@ import org.apache.celeborn.common.client.{ApplicationInfoProvider, MasterClient} import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier} import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo} +import org.apache.celeborn.common.metrics.ClientMetric import org.apache.celeborn.common.metrics.source.Role import org.apache.celeborn.common.network.protocol.{SerdeVersion, TransportMessagesHelper} import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo @@ -222,8 +223,21 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false) + val clientSource = new CelebornClientSource(conf) + private[client] val clientMetricsEnabled = conf.metricsSystemEnable && conf.clientMetricsEnabled val commitManager = new CommitManager(appUniqueId, conf, this) val workerStatusTracker = new WorkerStatusTracker(conf, this) + if (clientMetricsEnabled) { + clientSource.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => + registeredShuffle.size + } + clientSource.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => + workerStatusTracker.excludedWorkers.size + } + clientSource.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () => + workerStatusTracker.shuttingWorkers.size + } + } private val heartbeater = new ApplicationHeartbeater( appUniqueId, @@ -236,7 +250,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends }, workerStatusTracker, registeredShuffle, - reason => cancelAllActiveStages(reason)) + reason => cancelAllActiveStages(reason), + () => + if (clientMetricsEnabled) { + clientSource.getMetricsSnapshot().asJava + } else { + new util.HashMap[String, ClientMetric]() + }) private def resetFallbackCounts(counts: ConcurrentHashMap[String, java.lang.Long]) : Map[String, java.lang.Long] = { val fallbackCounts = new util.HashMap[String, java.lang.Long]() @@ -727,6 +747,13 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Reply to all RegisterShuffle request for current shuffle id. def replyRegisterShuffle(response: RegisterShuffleResponse): Unit = { + if (clientMetricsEnabled) { + if (response.status == StatusCode.SUCCESS) { + clientSource.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT) + } else { + clientSource.incCounter(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT) + } + } registeringShuffleRequest.synchronized { val serializedMsg: Option[ByteBuffer] = partitionType match { case PartitionType.REDUCE => @@ -888,9 +915,15 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends serdeVersion: SerdeVersion): Unit = { val contextWrapper = ChangeLocationsCallContext(context, partitionIds.size(), serdeVersion) + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.REVIVE_REQUEST_COUNT, partitionIds.size()) + } // If shuffle not registered, reply ShuffleNotRegistered and return if (!registeredShuffle.contains(shuffleId)) { logError(s"[handleRevive] shuffle $shuffleId not registered!") + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT) + } contextWrapper.reply( -1, StatusCode.SHUFFLE_UNREGISTERED, @@ -902,6 +935,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends s"[handleRevive] shuffle $shuffleId, $mapIds, $partitionIds, $oldEpochs, $oldPartitions, $causes") if (commitManager.isStageEnd(shuffleId)) { logError(s"[handleRevive] shuffle $shuffleId stage ended!") + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT) + } contextWrapper.reply( -1, StatusCode.STAGE_ENDED, @@ -1115,6 +1151,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (invokeReportTaskShuffleFetchFailurePreCheck(taskId)) { logInfo(s"handle fetch failure for appShuffleId $appShuffleId shuffleId $shuffleId") ret = invokeAppShuffleTrackerCallback(appShuffleId) + if (ret && clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT) + } shuffleIds.put(appShuffleIdentifier, (shuffleId, false)) } else { logInfo( @@ -1246,6 +1285,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends context.reply(MapperEndResponse(StatusCode.SUCCESS, serdeVersion)) case false => logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.") + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) + } context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST, serdeVersion)) } } @@ -1611,6 +1653,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // [[releasePartitionLocation]]. Now in the slots are all the successful partition // locations. logWarning(s"Reserve buffers for $shuffleId still fail after retrying, clear buffers.") + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT) + } destroySlotsWithRetry(shuffleId, slots) } else { logInfo(s"Reserve buffer success for shuffleId $shuffleId") @@ -1830,6 +1875,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // if unregister shuffle not success, wait next turn if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) { unregisterShuffleTime.remove(shuffleId) + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) + } } } } else { @@ -1841,6 +1889,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) { shuffleIdsToRemove.foreach { shuffleId: Integer => unregisterShuffleTime.remove(shuffleId) + if (clientMetricsEnabled) { + clientSource.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) + } } } } diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala index b3e7aa90ab5..20ad426820c 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala @@ -29,7 +29,7 @@ import com.google.common.base.Preconditions.checkState import com.google.common.cache.{Cache, CacheBuilder} import com.google.common.collect.Sets -import org.apache.celeborn.client.{ClientUtils, LifecycleManager, ShuffleCommittedInfo, WorkerStatusTracker} +import org.apache.celeborn.client.{CelebornClientSource, ClientUtils, LifecycleManager, ShuffleCommittedInfo, WorkerStatusTracker} import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers} import org.apache.celeborn.common.{CelebornConf, CommitMetadata} @@ -222,6 +222,9 @@ class ReducePartitionCommitHandler( } else { logError(s"Failed to handle stageEnd for $shuffleId, lost file!") dataLostShuffleSet.add(shuffleId) + if (conf.metricsSystemEnable && conf.clientMetricsEnabled) { + lifecycleManager.clientSource.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) + } // record in stageEndShuffleSet setStageEnd(shuffleId) } diff --git a/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala b/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala new file mode 100644 index 00000000000..5d06e309873 --- /dev/null +++ b/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.client + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.MetricType + +class CelebornClientSourceSuite extends CelebornFunSuite { + + test("counters are declared, increment, and emit with role=Client") { + val source = new CelebornClientSource(new CelebornConf()) + + source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT) + source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT) + source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT) + source.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT, 3) + source.incCounter(CelebornClientSource.REVIVE_REQUEST_COUNT, 5) + source.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT, 2) + source.incCounter(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT) + source.incCounter(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT) + source.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) + + val metrics = source.getMetrics + assert(metrics.contains("""metrics_ClientRegisterShuffleCount_Count""")) + assert(metrics.contains("""role="Client"""")) + + val snapshot = source.getMetricsSnapshot() + assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).value == 2) + assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).metricType == MetricType.Counter) + assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT).value == 1) + assert(snapshot(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT).value == 3) + assert(snapshot(CelebornClientSource.REVIVE_REQUEST_COUNT).value == 5) + assert(snapshot(CelebornClientSource.REVIVE_FAIL_COUNT).value == 2) + assert(snapshot(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT).value == 1) + assert(snapshot(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT).value == 1) + assert(snapshot(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT).value == 1) + } + + test("gauges registered on the source are reflected in metrics and snapshot") { + val source = new CelebornClientSource(new CelebornConf()) + val excluded = new AtomicInteger(0) + + source.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => excluded.get() } + + assert(source.getMetricsSnapshot()(CelebornClientSource.EXCLUDED_WORKER_COUNT).value == 0) + + excluded.set(5) + val metrics = source.getMetrics + assert(metrics.contains("metrics_ClientExcludedWorkerCount_Value")) + val snapshot = source.getMetricsSnapshot() + assert(snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).value == 5) + assert(snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).metricType == MetricType.Gauge) + } + + test("getMetricsSnapshot includes both counters and gauges with correct types") { + val source = new CelebornClientSource(new CelebornConf()) + source.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => 7 } + source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT) + + val snapshot = source.getMetricsSnapshot() + assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_COUNT)) + assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).metricType == MetricType.Counter) + assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT)) + assert(snapshot.contains(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT)) + assert(snapshot.contains(CelebornClientSource.REVIVE_REQUEST_COUNT)) + assert(snapshot.contains(CelebornClientSource.REVIVE_FAIL_COUNT)) + assert(snapshot.contains(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT)) + assert(snapshot.contains(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT)) + assert(snapshot.contains(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT)) + assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).value == 7) + assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).metricType == MetricType.Gauge) + } +} diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 90a1d53bbdf..8369d6b15d9 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer import org.junit.Assert import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.CelebornConf.{CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED} import org.apache.celeborn.common.meta.WorkerInfo @@ -159,6 +160,29 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { errors.get()) } + test("recordWorkerFailure increments client worker-excluded counter and gauge") { + val celebornConf = new CelebornConf() + val lifecycleManager = new LifecycleManager("app-metrics-test", celebornConf) + try { + val statusTracker = lifecycleManager.workerStatusTracker + val source = lifecycleManager.clientSource + + val failed = new ShuffleFailedWorkers() + val now = System.currentTimeMillis() + failed.put(mock("host1"), (StatusCode.WORKER_UNRESPONSIVE, now)) + failed.put(mock("host2"), (StatusCode.WORKER_UNRESPONSIVE, now)) + statusTracker.recordWorkerFailure(failed) + + val snapshot = source.getMetricsSnapshot() + Assert.assertEquals(2L, snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).value) + + // re-recording already-excluded workers does not change the gauge + statusTracker.recordWorkerFailure(failed) + } finally { + lifecycleManager.stop() + } + } + private def buildResponse( excludedWorkerHosts: Array[String], unknownWorkerHosts: Array[String], diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index a813a9e5015..741cbe05b17 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -501,6 +501,17 @@ message PbHeartbeatFromApplication { int64 applicationCount = 9; map applicationFallbackCounts = 10; PbHeartbeatInfo heartbeatInfo = 11; + map clientMetrics = 12; +} + +enum PbMetricType { + GAUGE = 0; + COUNTER = 1; +} + +message PbClientMetric { + int64 value = 1; + PbMetricType type = 2; } message PbHeartbeatFromApplicationResponse { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index ea6b819fcfa..fccd299f22a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -910,6 +910,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se // ////////////////////////////////////////////////////// def metricsConf: Option[String] = get(METRICS_CONF) def metricsSystemEnable: Boolean = get(METRICS_ENABLED) + def clientMetricsEnabled: Boolean = get(CLIENT_METRICS_ENABLED) def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE) def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE) def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED) @@ -5965,6 +5966,15 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(true) + val CLIENT_METRICS_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.metrics.enabled") + .categories("metrics") + .doc("When true, the LifecycleManager collects client-side metrics. " + + "Requires `celeborn.metrics.enabled` to also be true.") + .version("0.7.0") + .booleanConf + .createWithDefault(false) + val METRICS_SAMPLE_RATE: ConfigEntry[Double] = buildConf("celeborn.metrics.sample.rate") .categories("metrics") diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala new file mode 100644 index 00000000000..af47a5c0cc3 --- /dev/null +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/ClientMetric.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.metrics + +sealed trait MetricType +object MetricType { + case object Gauge extends MetricType + case object Counter extends MetricType +} + +case class ClientMetric(value: Long, metricType: MetricType) diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index e7b71e5ec23..fd40e6b9481 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -74,6 +74,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) Map("instance" -> s"${Utils.localHostName(conf)}:${conf.masterHttpPort}") case Role.WORKER => Map("instance" -> s"${Utils.localHostName(conf)}:${conf.workerHttpPort}") + case Role.CLIENT => + Map("instance" -> Utils.localHostName(conf)) case _ => Map.empty } val staticLabels: Map[String, String] = labelsWithCustomizedLabels(Map.empty) @@ -248,6 +250,10 @@ abstract class AbstractSource(conf: CelebornConf, role: String) namedGauges.containsKey(metricNameWithCustomizedLabels(name, labels)) } + def counterExists(name: String, labels: Map[String, String]): Boolean = { + namedCounters.containsKey(metricNameWithCustomizedLabels(name, labels)) + } + def needSample(): Boolean = { if (metricsSampleRate >= 1) { true diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index f1e34aa54f8..237fc0c2191 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -29,6 +29,7 @@ import org.roaringbitmap.RoaringBitmap import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} +import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} import org.apache.celeborn.common.network.protocol.{SerdeVersion, TransportMessage} import org.apache.celeborn.common.protocol._ import org.apache.celeborn.common.protocol.MessageType._ @@ -389,7 +390,9 @@ object ControlMessages extends Logging { applicationFallbackCounts: util.Map[String, java.lang.Long], needCheckedWorkerList: util.List[WorkerInfo], override var requestId: String = ZERO_UUID, - shouldResponse: Boolean = false) extends MasterRequestMessage + shouldResponse: Boolean = false, + clientMetrics: util.Map[String, ClientMetric] = new util.HashMap[String, ClientMetric]()) + extends MasterRequestMessage case class HeartbeatFromApplicationResponse( statusCode: StatusCode, @@ -868,7 +871,8 @@ object ControlMessages extends Logging { applicationFallbackCounts, needCheckedWorkerList, requestId, - shouldResponse) => + shouldResponse, + clientMetrics) => val payload = PbHeartbeatFromApplication.newBuilder() .setAppId(appId) .setRequestId(requestId) @@ -881,6 +885,13 @@ object ControlMessages extends Logging { .addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map( PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava) .setShouldResponse(shouldResponse) + .putAllClientMetrics(clientMetrics.asScala.map { case (name, metric) => + val pbType = metric.metricType match { + case MetricType.Counter => PbMetricType.COUNTER + case MetricType.Gauge => PbMetricType.GAUGE + } + name -> PbClientMetric.newBuilder().setValue(metric.value).setType(pbType).build() + }.asJava) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload) @@ -1365,7 +1376,15 @@ object ControlMessages extends Logging { pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala .map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava), pbHeartbeatFromApplication.getRequestId, - pbHeartbeatFromApplication.getShouldResponse) + pbHeartbeatFromApplication.getShouldResponse, + new util.HashMap[String, ClientMetric]( + pbHeartbeatFromApplication.getClientMetricsMap.asScala.map { case (name, pbMetric) => + val metricType = pbMetric.getType match { + case PbMetricType.COUNTER => MetricType.Counter + case _ => MetricType.Gauge + } + name -> ClientMetric(pbMetric.getValue, metricType) + }.asJava)) case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE => val pbHeartbeatFromApplicationResponse = diff --git a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala index c83a83b95a7..cd69de29697 100644 --- a/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/util/UtilsSuite.scala @@ -28,9 +28,10 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.client.{MasterEndpointResolver, StaticMasterEndpointResolver} import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.identity.DefaultIdentityProvider +import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} import org.apache.celeborn.common.network.protocol.SerdeVersion import org.apache.celeborn.common.protocol.{PartitionLocation, PbReviseLostShuffles, PbReviseLostShufflesResponse, TransportModuleConstants} -import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, MapperEnd, ReviseLostShuffles, ReviseLostShufflesResponse} +import org.apache.celeborn.common.protocol.message.ControlMessages.{GetReducerFileGroupResponse, HeartbeatFromApplication, MapperEnd, ReviseLostShuffles, ReviseLostShufflesResponse} import org.apache.celeborn.common.protocol.message.StatusCode class UtilsSuite extends CelebornFunSuite { @@ -259,6 +260,29 @@ class UtilsSuite extends CelebornFunSuite { assert(set.size == 0) } + test("HeartbeatFromApplication carries client metrics through pb serde") { + val clientMetrics = new util.HashMap[String, ClientMetric]() + clientMetrics.put("ClientRegisterShuffleCount", ClientMetric(5L, MetricType.Counter)) + clientMetrics.put("ClientExcludedWorkerCount", ClientMetric(2L, MetricType.Gauge)) + + val heartbeat = HeartbeatFromApplication( + "app-1", + 100L, + 10L, + 3L, + 1L, + new util.HashMap[String, java.lang.Long](), + new util.HashMap[String, java.lang.Long](), + new util.ArrayList(), + shouldResponse = true, + clientMetrics = clientMetrics) + + val heartbeatTrans = Utils.fromTransportMessage(Utils.toTransportMessage(heartbeat)) + .asInstanceOf[HeartbeatFromApplication] + + assert(heartbeatTrans.clientMetrics == clientMetrics) + } + test("validate number of client/server netty threads") { val celebornConf = new CelebornConf() celebornConf.set("celeborn.io.maxDefaultNettyThreads", "100") diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala new file mode 100644 index 00000000000..405c74edc03 --- /dev/null +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master + +import java.util.{Map => JMap} +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.internal.Logging +import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} +import org.apache.celeborn.common.metrics.source.{AbstractSource, Role} +import org.apache.celeborn.common.util.JavaUtils + +/** + * Holds the client-side metrics that applications report in their heartbeat and re-exposes them on + * the master's Prometheus endpoint, labeled by `applicationId`. Both the registrations and any + * cached state are dropped when the application is terminated. + */ +class ApplicationMetricsSource(conf: CelebornConf) + extends AbstractSource(conf, Role.MASTER) with Logging { + override val sourceName = "application" + + // applicationId -> (metricName -> latest reported gauge value) + private val appGaugeCache = + JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, java.lang.Long]]() + + // applicationId -> (metricName -> last reported counter value, used to compute deltas) + private val appCounterPrev = + JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, java.lang.Long]]() + + startCleaner() + + def updateApplicationMetrics(appId: String, metrics: JMap[String, ClientMetric]): Unit = { + if (metrics.isEmpty) return + metrics.asScala.foreach { case (name, metric) => + val labels = Map(applicationLabel -> appId) + metric.metricType match { + case MetricType.Gauge => updateGauge(appId, name, labels, metric.value) + case MetricType.Counter => updateCounter(appId, name, labels, metric.value) + } + } + } + + private def updateGauge( + appId: String, + name: String, + labels: Map[String, String], + value: Long): Unit = { + val cache = appGaugeCache.computeIfAbsent(appId, _ => JavaUtils.newConcurrentHashMap()) + cache.put(name, value) + if (!gaugeExists(name, labels)) { + addGauge(name, labels) { () => + Option(appGaugeCache.get(appId)) + .flatMap(m => Option(m.get(name))) + .map(_.longValue()) + .getOrElse(0L) + } + } + } + + private def updateCounter( + appId: String, + name: String, + labels: Map[String, String], + newValue: Long): Unit = { + val prev = appCounterPrev.computeIfAbsent(appId, _ => JavaUtils.newConcurrentHashMap()) + if (!counterExists(name, labels)) { + addCounter(name, labels) + } + val prevValue = prev.getOrDefault(name, 0L) + val delta = newValue - prevValue + if (delta > 0) { + incCounter(name, delta, labels) + } + prev.put(name, newValue) + } + + def removeApplicationMetrics(appId: String): Unit = { + val labels = Map(applicationLabel -> appId) + val gaugeCache = appGaugeCache.remove(appId) + if (gaugeCache != null) { + gaugeCache.keySet().asScala.foreach(name => removeGauge(name, labels)) + } + val counterPrev = appCounterPrev.remove(appId) + if (counterPrev != null) { + counterPrev.keySet().asScala.foreach(name => removeCounter(name, labels)) + } + } +} diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 6b25552060a..73e3c0ac078 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -41,7 +41,7 @@ import org.apache.celeborn.common.exception.CelebornException import org.apache.celeborn.common.identity.UserIdentifier import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{DiskInfo, WorkerInfo, WorkerStatus} -import org.apache.celeborn.common.metrics.MetricsSystem +import org.apache.celeborn.common.metrics.{ClientMetric, MetricsSystem} import org.apache.celeborn.common.metrics.source.{JVMCPUSource, JVMSource, ResourceConsumptionSource, Role, SystemMiscSource, ThreadPoolSource} import org.apache.celeborn.common.network.CelebornRackResolver import org.apache.celeborn.common.network.protocol.{TransportMessage, TransportMessagesHelper} @@ -75,12 +75,14 @@ private[celeborn] class Master( new ResourceConsumptionSource(conf, Role.MASTER) private val threadPoolSource = ThreadPoolSource(conf, Role.MASTER) private val masterSource = new MasterSource(conf) + private val applicationMetricsSource = new ApplicationMetricsSource(conf) private val jvmSource = new JVMSource(conf, Role.MASTER) private val jvmCpuSource = new JVMCPUSource(conf, Role.MASTER) private val systemMiscSource = new SystemMiscSource(conf, Role.MASTER) metricsSystem.registerSource(resourceConsumptionSource) metricsSystem.registerSource(masterSource) + metricsSystem.registerSource(applicationMetricsSource) metricsSystem.registerSource(threadPoolSource) metricsSystem.registerSource(jvmSource) metricsSystem.registerSource(jvmCpuSource) @@ -477,7 +479,8 @@ private[celeborn] class Master( applicationFallbackCounts, needCheckedWorkerList, requestId, - shouldResponse) => + shouldResponse, + clientMetrics) => logDebug(s"Received heartbeat from app $appId") checkAuth(context, appId) executeWithLeaderChecker( @@ -493,7 +496,8 @@ private[celeborn] class Master( applicationFallbackCounts, needCheckedWorkerList, requestId, - shouldResponse)) + shouldResponse, + clientMetrics)) case pbRegisterWorker: PbRegisterWorker => val requestId = pbRegisterWorker.getRequestId @@ -1151,6 +1155,7 @@ private[celeborn] class Master( workersAssignedToApp.remove(appId) statusSystem.handleAppLost(appId, requestId) quotaManager.handleAppLost(appId) + applicationMetricsSource.removeApplicationMetrics(appId) logInfo(s"Removed application $appId") if (remoteStorageDirs.isDefined) { checkAndCleanExpiredAppDirsOnDFS(appId) @@ -1228,7 +1233,8 @@ private[celeborn] class Master( applicationFallbackCounts: util.Map[String, java.lang.Long], needCheckedWorkerList: util.List[WorkerInfo], requestId: String, - shouldResponse: Boolean): Unit = { + shouldResponse: Boolean, + clientMetrics: util.Map[String, ClientMetric]): Unit = { statusSystem.handleAppHeartbeat( appId, totalWritten, @@ -1239,6 +1245,7 @@ private[celeborn] class Master( applicationFallbackCounts, System.currentTimeMillis(), requestId) + applicationMetricsSource.updateApplicationMetrics(appId, clientMetrics) gaugeShuffleFallbackCounts() val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w => statusSystem.workersMap.containsKey(w.toUniqueId)).asJava diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala new file mode 100644 index 00000000000..d450965c653 --- /dev/null +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.master + +import java.util.{HashMap => JHashMap} + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf + +class ApplicationMetricsSourceSuite extends CelebornFunSuite { + + private def metricsOf(app: String, value: Long): JHashMap[String, java.lang.Long] = { + val map = new JHashMap[String, java.lang.Long]() + map.put("ClientRegisterShuffleCount", java.lang.Long.valueOf(value)) + map + } + + test("client metrics are emitted as gauges labeled by applicationId") { + val source = new ApplicationMetricsSource(new CelebornConf()) + + source.updateApplicationMetrics("app-1", metricsOf("app-1", 3)) + source.updateApplicationMetrics("app-2", metricsOf("app-2", 7)) + + val metrics = source.getMetrics + assert(metrics.contains("metrics_ClientRegisterShuffleCount_Value")) + assert(metrics.contains("""applicationId="app-1"""")) + assert(metrics.contains("""applicationId="app-2"""")) + assert(metrics.contains("""role="Master"""")) + } + + test("latest reported value is reflected and removed on application lost") { + val source = new ApplicationMetricsSource(new CelebornConf()) + + source.updateApplicationMetrics("app-1", metricsOf("app-1", 1)) + source.updateApplicationMetrics("app-1", metricsOf("app-1", 42)) + assert(source.gauges().exists(g => + g.labels.get("applicationId").contains("app-1") && + g.gauge.getValue.asInstanceOf[Number].longValue() == 42L)) + + source.removeApplicationMetrics("app-1") + assert(!source.gauges().exists(_.labels.get("applicationId").contains("app-1"))) + } + + test("empty metrics map registers nothing") { + val source = new ApplicationMetricsSource(new CelebornConf()) + source.updateApplicationMetrics("app-1", new JHashMap[String, java.lang.Long]()) + assert(source.gauges().isEmpty) + } +} From a31fe318fdaebace5f179f5593840ad3a800edb8 Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Tue, 16 Jun 2026 17:00:08 +0000 Subject: [PATCH 2/6] Update --- .../org/apache/celeborn/client/ChangePartitionManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index ae8f28a802f..66c924d22a9 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -23,7 +23,6 @@ import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, Schedu import scala.collection.JavaConverters._ -import org.apache.celeborn.client.CelebornClientSource import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging From 57e588a03b1342a0dc35e4b2c4c8a9ce16743c46 Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Wed, 17 Jun 2026 11:10:14 +0000 Subject: [PATCH 3/6] Update --- .../celeborn/client/WorkerStatusTrackerSuite.scala | 2 ++ docs/configuration/metrics.md | 1 + .../deploy/master/ApplicationMetricsSourceSuite.scala | 9 +++++---- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 8369d6b15d9..30a6c43907a 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -162,6 +162,8 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { test("recordWorkerFailure increments client worker-excluded counter and gauge") { val celebornConf = new CelebornConf() + celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true") + celebornConf.set(CelebornConf.CLIENT_METRICS_ENABLED.key, "true") val lifecycleManager = new LifecycleManager("app-metrics-test", celebornConf) try { val statusTracker = lifecycleManager.workerStatusTracker diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index 148c49f0740..1fbbca38965 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -19,6 +19,7 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | +| celeborn.client.metrics.enabled | false | false | When true, the LifecycleManager collects client-side metrics. Requires `celeborn.metrics.enabled` to also be true. | 0.7.0 | | | celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | | | celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | | | celeborn.metrics.conf | <undefined> | false | Custom metrics configuration file path. Default use `metrics.properties` in classpath. | 0.3.0 | | diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala index d450965c653..e8642f78fa1 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala @@ -21,12 +21,13 @@ import java.util.{HashMap => JHashMap} import org.apache.celeborn.CelebornFunSuite import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} class ApplicationMetricsSourceSuite extends CelebornFunSuite { - private def metricsOf(app: String, value: Long): JHashMap[String, java.lang.Long] = { - val map = new JHashMap[String, java.lang.Long]() - map.put("ClientRegisterShuffleCount", java.lang.Long.valueOf(value)) + private def metricsOf(app: String, value: Long): JHashMap[String, ClientMetric] = { + val map = new JHashMap[String, ClientMetric]() + map.put("ClientRegisterShuffleCount", ClientMetric(value, MetricType.Gauge)) map } @@ -58,7 +59,7 @@ class ApplicationMetricsSourceSuite extends CelebornFunSuite { test("empty metrics map registers nothing") { val source = new ApplicationMetricsSource(new CelebornConf()) - source.updateApplicationMetrics("app-1", new JHashMap[String, java.lang.Long]()) + source.updateApplicationMetrics("app-1", new JHashMap[String, ClientMetric]()) assert(source.gauges().isEmpty) } } From 438f541fd344fe7b9e0d35f58e2ba4f7bb2a7a16 Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Thu, 18 Jun 2026 11:45:16 +0000 Subject: [PATCH 4/6] Updated client side metrics flow --- .../client/ApplicationHeartbeater.scala | 6 +- .../client/CelebornClientSource.scala | 28 ++- .../client/ChangePartitionManager.scala | 8 +- .../celeborn/client/LifecycleManager.scala | 67 +++--- .../commit/ReducePartitionCommitHandler.scala | 4 +- .../client/WorkerStatusTrackerSuite.scala | 2 +- common/src/main/proto/TransportMessages.proto | 1 + .../apache/celeborn/common/CelebornConf.scala | 38 ++++ .../metrics/source/AbstractSource.scala | 74 +++++++ .../protocol/message/ControlMessages.scala | 27 ++- .../metrics/source/CelebornSourceSuite.scala | 100 +++++++++ docs/configuration/client.md | 1 + docs/configuration/metrics.md | 3 + .../master/ApplicationMetricsSource.scala | 90 +++----- .../service/deploy/master/Master.scala | 14 +- .../ApplicationMetricsSourceSuite.scala | 205 ++++++++++++++++-- 16 files changed, 514 insertions(+), 154 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala index d2423e5ac6e..0e5c3d8bff3 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ApplicationHeartbeater.scala @@ -49,6 +49,8 @@ class ApplicationHeartbeater( private var stopped = false private val reviseLostShuffles = conf.reviseLostShufflesEnabled + private val appMetricLabels: util.Map[String, String] = + conf.clientMetricsAppLabels.asJava // Use independent app heartbeat threads to avoid being blocked by other operations. private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs @@ -89,7 +91,9 @@ class ApplicationHeartbeater( workerStatusTracker.getNeedCheckedWorkers().toList.asJava, ZERO_UUID, true, - clientMetrics()) + if (appMetricLabels.isEmpty) new util.HashMap[String, ClientMetric]() + else clientMetrics(), + appMetricLabels) val response = requestHeartbeat(appHeartbeat) if (response.statusCode == StatusCode.SUCCESS) { logDebug("Successfully send app heartbeat.") diff --git a/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala b/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala index 9da668d3dc4..ca513ba43c3 100644 --- a/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala +++ b/client/src/main/scala/org/apache/celeborn/client/CelebornClientSource.scala @@ -17,6 +17,8 @@ package org.apache.celeborn.client +import java.util.concurrent.ConcurrentHashMap + import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} import org.apache.celeborn.common.metrics.source.{AbstractSource, Role} @@ -29,6 +31,9 @@ class CelebornClientSource(conf: CelebornConf) extends AbstractSource(conf, Role import CelebornClientSource._ + // Tracks previous counter values so we can send deltas to the master. + private val counterPrev = new ConcurrentHashMap[String, java.lang.Long]() + addCounter(REGISTER_SHUFFLE_COUNT) addCounter(REGISTER_SHUFFLE_FAIL_COUNT) addCounter(UNREGISTER_SHUFFLE_COUNT) @@ -39,34 +44,35 @@ class CelebornClientSource(conf: CelebornConf) extends AbstractSource(conf, Role addCounter(SHUFFLE_DATA_LOST_COUNT) def getMetricsSnapshot(): Map[String, ClientMetric] = { - val counterMetrics = counters().map(c => - c.name -> ClientMetric(c.counter.getCount, MetricType.Counter)) + // Counters: compute delta since last snapshot + val counterMetrics = counters().flatMap { c => + val current = c.counter.getCount + val prev = Option(counterPrev.put(c.name, current)).map(_.longValue()).getOrElse(0L) + val delta = current - prev + if (delta > 0) Some(c.name -> ClientMetric(delta, MetricType.Counter)) + else None + } + // Gauges: send the latest value as-is. val gaugeMetrics = gauges().map(g => g.name -> ClientMetric(g.gauge.getValue.asInstanceOf[Number].longValue(), MetricType.Gauge)) (counterMetrics ++ gaugeMetrics).toMap } - // start cleaner thread - startCleaner() + def start(): Unit = startCleaner() + + def stop(): Unit = metricsCleaner.shutdown() } object CelebornClientSource { - // worker health val EXCLUDED_WORKER_COUNT = "ClientExcludedWorkerCount" val SHUTTING_WORKER_COUNT = "ClientShuttingWorkerCount" - - // shuffle lifecycle val ACTIVE_SHUFFLE_COUNT = "ClientActiveShuffleCount" val REGISTER_SHUFFLE_COUNT = "ClientRegisterShuffleCount" val REGISTER_SHUFFLE_FAIL_COUNT = "ClientRegisterShuffleFailCount" val UNREGISTER_SHUFFLE_COUNT = "ClientUnregisterShuffleCount" - - // write path val REVIVE_REQUEST_COUNT = "ClientReviveRequestCount" val REVIVE_FAIL_COUNT = "ClientReviveFailCount" val SLOT_RESERVATION_FAIL_COUNT = "ClientSlotReservationFailCount" - - // data integrity val SHUFFLE_FETCH_FAILURE_COUNT = "ClientShuffleFetchFailureCount" val SHUFFLE_DATA_LOST_COUNT = "ClientShuffleDataLostCount" } diff --git a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala index 66c924d22a9..68382af520d 100644 --- a/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/ChangePartitionManager.scala @@ -282,11 +282,9 @@ class ChangePartitionManager( None, lifecycleManager.workerStatusTracker.workerAvailableByLocation(req.oldPartition)))) } - if (lifecycleManager.clientMetricsEnabled) { - lifecycleManager.clientSource.incCounter( - CelebornClientSource.REVIVE_FAIL_COUNT, - changePartitions.size) - } + lifecycleManager.incClientMetric( + CelebornClientSource.REVIVE_FAIL_COUNT, + changePartitions.size) } val candidates = new util.HashSet[WorkerInfo]() diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index f718550716a..39fc71d3308 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -223,20 +223,25 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false) - val clientSource = new CelebornClientSource(conf) - private[client] val clientMetricsEnabled = conf.metricsSystemEnable && conf.clientMetricsEnabled + private val clientMetricsEnabled = conf.metricsSystemEnable && conf.clientMetricsEnabled + val clientSource: Option[CelebornClientSource] = + if (clientMetricsEnabled) Some(new CelebornClientSource(conf)) else None + + @inline private[client] def incClientMetric(name: String, delta: Long = 1L): Unit = + clientSource.foreach(_.incCounter(name, delta)) val commitManager = new CommitManager(appUniqueId, conf, this) val workerStatusTracker = new WorkerStatusTracker(conf, this) - if (clientMetricsEnabled) { - clientSource.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => + clientSource.foreach { source => + source.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => registeredShuffle.size } - clientSource.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => + source.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => workerStatusTracker.excludedWorkers.size } - clientSource.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () => + source.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () => workerStatusTracker.shuttingWorkers.size } + source.start() } private val heartbeater = new ApplicationHeartbeater( @@ -252,11 +257,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends registeredShuffle, reason => cancelAllActiveStages(reason), () => - if (clientMetricsEnabled) { - clientSource.getMetricsSnapshot().asJava - } else { - new util.HashMap[String, ClientMetric]() - }) + clientSource.map(_.getMetricsSnapshot().asJava) + .getOrElse(new util.HashMap[String, ClientMetric]())) private def resetFallbackCounts(counts: ConcurrentHashMap[String, java.lang.Long]) : Map[String, java.lang.Long] = { val fallbackCounts = new util.HashMap[String, java.lang.Long]() @@ -747,13 +749,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // Reply to all RegisterShuffle request for current shuffle id. def replyRegisterShuffle(response: RegisterShuffleResponse): Unit = { - if (clientMetricsEnabled) { - if (response.status == StatusCode.SUCCESS) { - clientSource.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT) - } else { - clientSource.incCounter(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT) - } - } + incClientMetric( + if (response.status == StatusCode.SUCCESS) CelebornClientSource.REGISTER_SHUFFLE_COUNT + else CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT) registeringShuffleRequest.synchronized { val serializedMsg: Option[ByteBuffer] = partitionType match { case PartitionType.REDUCE => @@ -915,15 +913,11 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends serdeVersion: SerdeVersion): Unit = { val contextWrapper = ChangeLocationsCallContext(context, partitionIds.size(), serdeVersion) - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.REVIVE_REQUEST_COUNT, partitionIds.size()) - } + incClientMetric(CelebornClientSource.REVIVE_REQUEST_COUNT, partitionIds.size()) // If shuffle not registered, reply ShuffleNotRegistered and return if (!registeredShuffle.contains(shuffleId)) { logError(s"[handleRevive] shuffle $shuffleId not registered!") - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT) - } + incClientMetric(CelebornClientSource.REVIVE_FAIL_COUNT, partitionIds.size()) contextWrapper.reply( -1, StatusCode.SHUFFLE_UNREGISTERED, @@ -935,9 +929,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends s"[handleRevive] shuffle $shuffleId, $mapIds, $partitionIds, $oldEpochs, $oldPartitions, $causes") if (commitManager.isStageEnd(shuffleId)) { logError(s"[handleRevive] shuffle $shuffleId stage ended!") - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT) - } + incClientMetric(CelebornClientSource.REVIVE_FAIL_COUNT, partitionIds.size()) contextWrapper.reply( -1, StatusCode.STAGE_ENDED, @@ -1151,9 +1143,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (invokeReportTaskShuffleFetchFailurePreCheck(taskId)) { logInfo(s"handle fetch failure for appShuffleId $appShuffleId shuffleId $shuffleId") ret = invokeAppShuffleTrackerCallback(appShuffleId) - if (ret && clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT) - } + if (ret) incClientMetric(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT) shuffleIds.put(appShuffleIdentifier, (shuffleId, false)) } else { logInfo( @@ -1285,9 +1275,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends context.reply(MapperEndResponse(StatusCode.SUCCESS, serdeVersion)) case false => logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.") - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) - } + incClientMetric(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST, serdeVersion)) } } @@ -1653,9 +1641,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // [[releasePartitionLocation]]. Now in the slots are all the successful partition // locations. logWarning(s"Reserve buffers for $shuffleId still fail after retrying, clear buffers.") - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT) - } + incClientMetric(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT) destroySlotsWithRetry(shuffleId, slots) } else { logInfo(s"Reserve buffer success for shuffleId $shuffleId") @@ -1875,9 +1861,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends // if unregister shuffle not success, wait next turn if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) { unregisterShuffleTime.remove(shuffleId) - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) - } + incClientMetric(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) } } } else { @@ -1889,9 +1873,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) { shuffleIdsToRemove.foreach { shuffleId: Integer => unregisterShuffleTime.remove(shuffleId) - if (clientMetricsEnabled) { - clientSource.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) - } + incClientMetric(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT) } } } @@ -2101,6 +2083,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends */ override def stop(): Unit = { heartbeater.stop() + clientSource.foreach(_.stop()) super.stop() } diff --git a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala index 20ad426820c..41fcece5710 100644 --- a/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala +++ b/client/src/main/scala/org/apache/celeborn/client/commit/ReducePartitionCommitHandler.scala @@ -222,9 +222,7 @@ class ReducePartitionCommitHandler( } else { logError(s"Failed to handle stageEnd for $shuffleId, lost file!") dataLostShuffleSet.add(shuffleId) - if (conf.metricsSystemEnable && conf.clientMetricsEnabled) { - lifecycleManager.clientSource.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) - } + lifecycleManager.incClientMetric(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT) // record in stageEndShuffleSet setStageEnd(shuffleId) } diff --git a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala index 30a6c43907a..338b04375b6 100644 --- a/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/WorkerStatusTrackerSuite.scala @@ -167,7 +167,7 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite { val lifecycleManager = new LifecycleManager("app-metrics-test", celebornConf) try { val statusTracker = lifecycleManager.workerStatusTracker - val source = lifecycleManager.clientSource + val source = lifecycleManager.clientSource.get val failed = new ShuffleFailedWorkers() val now = System.currentTimeMillis() diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 741cbe05b17..535b64e881c 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -502,6 +502,7 @@ message PbHeartbeatFromApplication { map applicationFallbackCounts = 10; PbHeartbeatInfo heartbeatInfo = 11; map clientMetrics = 12; + map metricLabels = 13; } enum PbMetricType { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index fccd299f22a..28a5a53272c 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -911,12 +911,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def metricsConf: Option[String] = get(METRICS_CONF) def metricsSystemEnable: Boolean = get(METRICS_ENABLED) def clientMetricsEnabled: Boolean = get(CLIENT_METRICS_ENABLED) + def masterClientMetricsEnabled: Boolean = get(MASTER_CLIENT_METRICS_ENABLED) + def masterClientMetricsRemovedAppRetentionMs: Long = + get(MASTER_CLIENT_METRICS_REMOVED_APP_RETENTION) def metricsSampleRate: Double = get(METRICS_SAMPLE_RATE) def metricsSlidingWindowSize: Int = get(METRICS_SLIDING_WINDOW_SIZE) def metricsCollectCriticalEnabled: Boolean = get(METRICS_COLLECT_CRITICAL_ENABLED) def metricsCapacity: Int = get(METRICS_CAPACITY) def metricsExtraLabels: Map[String, String] = get(METRICS_EXTRA_LABELS).map(Utils.parseKeyValuePair).toMap + def clientMetricsAppLabels: Map[String, String] = + get(CLIENT_METRICS_APP_LABELS).map(Utils.parseKeyValuePair).toMap def metricsWorkerAppTopResourceConsumptionCount: Int = get(METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT) def metricsWorkerAppTopResourceConsumptionBytesWrittenThreshold: Long = @@ -5975,6 +5980,25 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val MASTER_CLIENT_METRICS_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.metrics.master.clientMetrics.enabled") + .categories("metrics") + .doc("When true, the master exposes client-side metrics forwarded in application " + + "heartbeats on its Prometheus endpoint.") + .version("0.7.0") + .booleanConf + .createWithDefault(false) + + val MASTER_CLIENT_METRICS_REMOVED_APP_RETENTION: ConfigEntry[Long] = + buildConf("celeborn.metrics.master.clientMetrics.removedApp.retentionMs") + .categories("metrics") + .doc("How long to retain removed application IDs in the client metrics source to " + + "reject late heartbeats after an application is lost. Entries older than this are " + + "periodically evicted.") + .version("0.7.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("5min") + val METRICS_SAMPLE_RATE: ConfigEntry[Double] = buildConf("celeborn.metrics.sample.rate") .categories("metrics") @@ -6021,6 +6045,20 @@ object CelebornConf extends Logging { "Allowed pattern is: `=[,=]*`") .createWithDefault(Seq.empty) + val CLIENT_METRICS_APP_LABELS: ConfigEntry[Seq[String]] = + buildConf("celeborn.client.metrics.appLabels") + .categories("client", "metrics") + .doc("Custom metric labels sent from the client in each application heartbeat and applied " + + "to client metrics exposed on the master's Prometheus endpoint. " + + "Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1`") + .version("0.7.0") + .stringConf + .toSequence + .checkValue( + labels => labels.map(_ => Try(Utils.parseKeyValuePair(_))).forall(_.isSuccess), + "Allowed pattern is: `=[,=]*`") + .createWithDefault(Seq.empty) + val METRICS_WORKER_APP_TOP_RESOURCE_CONSUMPTION_COUNT: ConfigEntry[Int] = buildConf("celeborn.metrics.worker.app.topResourceConsumption.count") .categories("metrics") diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala index fd40e6b9481..f907edff6ba 100644 --- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala +++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/AbstractSource.scala @@ -19,6 +19,7 @@ package org.apache.celeborn.common.metrics.source import java.util.{Map => JMap} import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.collection.mutable @@ -48,6 +49,12 @@ case class NamedHistogram(name: String, histogram: Histogram, labels: Map[String case class NamedTimer(name: String, timer: Timer, labels: Map[String, String]) extends MetricLabels +case class AppMetricDetails[T]( + name: String, + handle: T, + originalLabels: Map[String, String], + additionalDetails: java.util.Set[String]) + abstract class AbstractSource(conf: CelebornConf, role: String) extends Source with Logging { override val metricRegistry = new MetricRegistry() @@ -101,6 +108,12 @@ abstract class AbstractSource(conf: CelebornConf, role: String) protected val namedHistogram: ConcurrentHashMap[String, NamedHistogram] = JavaUtils.newConcurrentHashMap[String, NamedHistogram]() + protected val namedGaugesWithDetails: ConcurrentHashMap[String, AppMetricDetails[AtomicLong]] = + JavaUtils.newConcurrentHashMap[String, AppMetricDetails[AtomicLong]]() + + protected val namedCountersWithDetails: ConcurrentHashMap[String, AppMetricDetails[Counter]] = + JavaUtils.newConcurrentHashMap[String, AppMetricDetails[Counter]]() + def addTimerMetrics(namedTimer: NamedTimer): Unit = { val timerMetricsString = getTimerMetrics(namedTimer) timerMetrics.add(timerMetricsString) @@ -214,6 +227,45 @@ abstract class AbstractSource(conf: CelebornConf, role: String) labelsWithCustomizedLabels(labels))) } + protected def addOrUpdateGaugeForApp( + name: String, + labels: Map[String, String], + appId: String, + value: Long): Unit = { + val details = namedGaugesWithDetails.computeIfAbsent( + metricNameWithCustomizedLabels(name, labels), + (_: String) => { + val holder = new AtomicLong() + addGauge(name, labels)(() => holder.get()) + AppMetricDetails(name, holder, labels, ConcurrentHashMap.newKeySet[String]()) + }) + details.additionalDetails.add(appId) + details.handle.set(value) + } + + protected def addOrUpdateCounterForApp( + name: String, + labels: Map[String, String], + appId: String, + delta: Long): Unit = { + if (delta <= 0) { + return + } + val metricKey = metricNameWithCustomizedLabels(name, labels) + val details = namedCountersWithDetails.computeIfAbsent( + metricKey, + (_: String) => { + addCounter(name, labels) + AppMetricDetails( + name, + namedCounters.get(metricKey).counter, + labels, + ConcurrentHashMap.newKeySet[String]()) + }) + details.additionalDetails.add(appId) + details.handle.inc(delta) + } + def counters(): List[NamedCounter] = { namedCounters.values().asScala.toList } @@ -283,6 +335,26 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricNameWithLabel } + protected def removeAppFromMetrics(appId: String): Unit = { + removeAppFromTracked(namedGaugesWithDetails, appId)(d => removeGauge(d.name, d.originalLabels)) + removeAppFromTracked(namedCountersWithDetails, appId)(d => + removeCounter(d.name, d.originalLabels)) + } + + private def removeAppFromTracked[T]( + tracked: ConcurrentHashMap[String, AppMetricDetails[T]], + appId: String)(deregister: AppMetricDetails[T] => Unit): Unit = { + val iter = tracked.entrySet().iterator() + while (iter.hasNext) { + val details = iter.next().getValue + details.additionalDetails.remove(appId) + if (details.additionalDetails.isEmpty) { + iter.remove() + deregister(details) + } + } + } + override def sample[T](metricsName: String, key: String)(f: => T): T = { sample(metricsName, key, Map.empty[String, String])(f) } @@ -697,6 +769,8 @@ abstract class AbstractSource(conf: CelebornConf, role: String) metricsCleaner.shutdown() namedCounters.clear() namedGauges.clear() + namedGaugesWithDetails.clear() + namedCountersWithDetails.clear() namedMeters.clear() namedTimers.clear() timerMetrics.clear() diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 237fc0c2191..a7c793bdd05 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -391,7 +391,8 @@ object ControlMessages extends Logging { needCheckedWorkerList: util.List[WorkerInfo], override var requestId: String = ZERO_UUID, shouldResponse: Boolean = false, - clientMetrics: util.Map[String, ClientMetric] = new util.HashMap[String, ClientMetric]()) + clientMetrics: util.Map[String, ClientMetric] = new util.HashMap[String, ClientMetric](), + metricLabels: util.Map[String, String] = new util.HashMap[String, String]()) extends MasterRequestMessage case class HeartbeatFromApplicationResponse( @@ -872,7 +873,8 @@ object ControlMessages extends Logging { needCheckedWorkerList, requestId, shouldResponse, - clientMetrics) => + clientMetrics, + metricLabels) => val payload = PbHeartbeatFromApplication.newBuilder() .setAppId(appId) .setRequestId(requestId) @@ -892,6 +894,7 @@ object ControlMessages extends Logging { } name -> PbClientMetric.newBuilder().setValue(metric.value).setType(pbType).build() }.asJava) + .putAllMetricLabels(metricLabels) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_APPLICATION, payload) @@ -1378,13 +1381,19 @@ object ControlMessages extends Logging { pbHeartbeatFromApplication.getRequestId, pbHeartbeatFromApplication.getShouldResponse, new util.HashMap[String, ClientMetric]( - pbHeartbeatFromApplication.getClientMetricsMap.asScala.map { case (name, pbMetric) => - val metricType = pbMetric.getType match { - case PbMetricType.COUNTER => MetricType.Counter - case _ => MetricType.Gauge - } - name -> ClientMetric(pbMetric.getValue, metricType) - }.asJava)) + pbHeartbeatFromApplication.getClientMetricsMap.asScala.flatMap { + case (name, pbMetric) => + pbMetric.getType match { + case PbMetricType.COUNTER => + Some(name -> ClientMetric(pbMetric.getValue, MetricType.Counter)) + case PbMetricType.GAUGE => + Some(name -> ClientMetric(pbMetric.getValue, MetricType.Gauge)) + case unknown => + logWarning(s"Unknown PbMetricType $unknown for metric $name, skipping") + None + } + }.asJava), + new util.HashMap[String, String](pbHeartbeatFromApplication.getMetricLabelsMap)) case HEARTBEAT_FROM_APPLICATION_RESPONSE_VALUE => val pbHeartbeatFromApplicationResponse = diff --git a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala index 110d0b1ebe5..f0a939c4b57 100644 --- a/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/metrics/source/CelebornSourceSuite.scala @@ -22,6 +22,51 @@ import org.apache.celeborn.common.CelebornConf class CelebornSourceSuite extends CelebornFunSuite { + private class TestSource extends AbstractSource(new CelebornConf(), Role.MASTER) { + override def sourceName: String = "testSource" + + def updateGauge( + name: String, + labels: Map[String, String], + appId: String, + value: Long): Unit = + addOrUpdateGaugeForApp(name, labels, appId, value) + + def updateCounter( + name: String, + labels: Map[String, String], + appId: String, + delta: Long): Unit = + addOrUpdateCounterForApp(name, labels, appId, delta) + + def removeApp(appId: String): Unit = removeAppFromMetrics(appId) + + def trackedGaugeCount: Int = namedGaugesWithDetails.size() + + def trackedCounterCount: Int = namedCountersWithDetails.size() + } + + private def hasLabels( + actual: Map[String, String], + expected: Map[String, String]): Boolean = + expected.forall { case (key, value) => actual.get(key).contains(value) } + + private def gaugeValue( + source: AbstractSource, + labels: Map[String, String], + name: String): Option[Long] = + source.gauges() + .find(g => g.name == name && hasLabels(g.labels, labels)) + .map(_.gauge.getValue.asInstanceOf[Number].longValue()) + + private def counterValue( + source: AbstractSource, + labels: Map[String, String], + name: String): Option[Long] = + source.counters() + .find(c => c.name == name && hasLabels(c.labels, labels)) + .map(_.counter.getCount) + test("test histogram") { val conf = new CelebornConf() @@ -133,4 +178,59 @@ class CelebornSourceSuite extends CelebornFunSuite { assert(metrics.contains( s"""metrics_Counter_Count{instance="$instance",role="Master",user="metric"} 2""")) } + + test("dynamic app gauge reuses existing gauge registration and updates value") { + val source = new TestSource() + val labels = Map("user" -> "metric") + + source.updateGauge("DynamicGauge", labels, "app-1", 1L) + source.updateGauge("DynamicGauge", labels, "app-1", 42L) + + assert(source.trackedGaugeCount == 1) + assert(source.gauges().count(g => g.name == "DynamicGauge" && hasLabels(g.labels, labels)) == 1) + assert(gaugeValue(source, labels, "DynamicGauge").contains(42L)) + assert(source.getMetrics.contains("""metrics_DynamicGauge_Value""")) + assert(source.getMetrics.contains("""user="metric"""")) + } + + test("dynamic app counter reuses existing counter registration and accumulates deltas") { + val source = new TestSource() + val labels = Map("user" -> "metric") + + source.updateCounter("DynamicCounter", labels, "app-1", 10L) + source.updateCounter("DynamicCounter", labels, "app-1", 5L) + source.updateCounter("DynamicCounter", labels, "app-1", 0L) + source.updateCounter("DynamicCounter", labels, "app-1", -1L) + + assert(source.trackedCounterCount == 1) + assert(source.counters().count(c => + c.name == "DynamicCounter" && hasLabels(c.labels, labels)) == 1) + assert(counterValue(source, labels, "DynamicCounter").contains(15L)) + } + + test("dynamic app metrics are removed only after all contributing apps are removed") { + val source = new TestSource() + val labels = Map("user" -> "metric") + + source.updateGauge("DynamicGauge", labels, "app-1", 1L) + source.updateGauge("DynamicGauge", labels, "app-2", 2L) + source.updateCounter("DynamicCounter", labels, "app-1", 10L) + source.updateCounter("DynamicCounter", labels, "app-2", 5L) + + source.removeApp("app-1") + + assert(source.trackedGaugeCount == 1) + assert(source.trackedCounterCount == 1) + assert(gaugeValue(source, labels, "DynamicGauge").contains(2L)) + assert(counterValue(source, labels, "DynamicCounter").contains(15L)) + + source.removeApp("app-2") + + assert(source.trackedGaugeCount == 0) + assert(source.trackedCounterCount == 0) + assert(gaugeValue(source, labels, "DynamicGauge").isEmpty) + assert(counterValue(source, labels, "DynamicCounter").isEmpty) + assert(!source.gaugeExists("DynamicGauge", labels)) + assert(!source.counterExists("DynamicCounter", labels)) + } } diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 5d15c6859fb..74b816cc025 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -50,6 +50,7 @@ license: | | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate | | celeborn.client.flink.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.6.0 | | | celeborn.client.inputStream.creation.window | 16 | false | Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario where multiple Partitions are read | 0.5.1 | | +| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics re-exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | | celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | | | celeborn.client.partition.reader.checkpoint.enabled | false | false | Whether or not checkpoint reads when re-creating a partition reader. Setting to true minimizes the amount of unnecessary reads during partition read retries | 0.6.0 | | | celeborn.client.partition.reader.waitLog.threshold | 60s | false | The threshold in milliseconds for logging partition read wait time. Log messages will be generated when wait time exceeds multiples of this threshold. | 0.6.2 | | diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index 1fbbca38965..70fe432d190 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -19,6 +19,7 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | +| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics re-exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | | celeborn.client.metrics.enabled | false | false | When true, the LifecycleManager collects client-side metrics. Requires `celeborn.metrics.enabled` to also be true. | 0.7.0 | | | celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | | | celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | | @@ -29,6 +30,8 @@ license: | | celeborn.metrics.json.pretty.enabled | true | false | When true, view metrics in json pretty format | 0.4.0 | | | celeborn.metrics.loggerSink.output.enabled | false | false | Whether to output scraped metrics to the logger. This config will have effect if you enabled logger sink.If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | | | celeborn.metrics.loggerSink.scrape.interval | 30min | false | The interval of logger sink to scrape its own metrics. This config will have effect if you enabled logger sink. If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | | +| celeborn.metrics.master.clientMetrics.enabled | false | false | When true, the master re-exposes client-side metrics forwarded in application heartbeats on its Prometheus endpoint. | 0.7.0 | | +| celeborn.metrics.master.clientMetrics.removedApp.retentionMs | 5min | false | How long to retain removed application IDs in the client metrics source to reject late heartbeats after an application is lost. Entries older than this are periodically evicted. | 0.7.0 | | | celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | | | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | | | celeborn.metrics.timer.slidingWindow.size | 4096 | false | The sliding window size of timer metric. | 0.2.0 | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala index 405c74edc03..ed2cf5d56c2 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSource.scala @@ -18,7 +18,7 @@ package org.apache.celeborn.service.deploy.master import java.util.{Map => JMap} -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -26,81 +26,55 @@ import org.apache.celeborn.common.CelebornConf import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} import org.apache.celeborn.common.metrics.source.{AbstractSource, Role} -import org.apache.celeborn.common.util.JavaUtils +import org.apache.celeborn.common.util.{JavaUtils, Utils} -/** - * Holds the client-side metrics that applications report in their heartbeat and re-exposes them on - * the master's Prometheus endpoint, labeled by `applicationId`. Both the registrations and any - * cached state are dropped when the application is terminated. - */ class ApplicationMetricsSource(conf: CelebornConf) extends AbstractSource(conf, Role.MASTER) with Logging { override val sourceName = "application" - // applicationId -> (metricName -> latest reported gauge value) - private val appGaugeCache = - JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, java.lang.Long]]() + private val masterClientMetricsEnabled = conf.masterClientMetricsEnabled + private val removedAppRetentionMs = conf.masterClientMetricsRemovedAppRetentionMs - // applicationId -> (metricName -> last reported counter value, used to compute deltas) - private val appCounterPrev = - JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, java.lang.Long]]() + // Tracking applications that have been terminated + private val removedAppIds = + JavaUtils.newConcurrentHashMap[String, java.lang.Long]() - startCleaner() + startRemovedAppCleaner() - def updateApplicationMetrics(appId: String, metrics: JMap[String, ClientMetric]): Unit = { - if (metrics.isEmpty) return - metrics.asScala.foreach { case (name, metric) => - val labels = Map(applicationLabel -> appId) - metric.metricType match { - case MetricType.Gauge => updateGauge(appId, name, labels, metric.value) - case MetricType.Counter => updateCounter(appId, name, labels, metric.value) + private def startRemovedAppCleaner(): Unit = { + val cleanTask: Runnable = new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + val cutoff = System.currentTimeMillis() - removedAppRetentionMs + removedAppIds.entrySet().asScala.foreach { entry => + if (entry.getValue < cutoff) { + removedAppIds.remove(entry.getKey, entry.getValue) + } + } } } + metricsCleaner.scheduleWithFixedDelay(cleanTask, 10, 10, TimeUnit.MINUTES) } - private def updateGauge( + def updateApplicationMetrics( appId: String, - name: String, - labels: Map[String, String], - value: Long): Unit = { - val cache = appGaugeCache.computeIfAbsent(appId, _ => JavaUtils.newConcurrentHashMap()) - cache.put(name, value) - if (!gaugeExists(name, labels)) { - addGauge(name, labels) { () => - Option(appGaugeCache.get(appId)) - .flatMap(m => Option(m.get(name))) - .map(_.longValue()) - .getOrElse(0L) - } + metricLabels: Map[String, String], + metrics: JMap[String, ClientMetric]): Unit = { + if (!masterClientMetricsEnabled || metricLabels.isEmpty || removedAppIds.containsKey(appId)) { + return } - } - private def updateCounter( - appId: String, - name: String, - labels: Map[String, String], - newValue: Long): Unit = { - val prev = appCounterPrev.computeIfAbsent(appId, _ => JavaUtils.newConcurrentHashMap()) - if (!counterExists(name, labels)) { - addCounter(name, labels) - } - val prevValue = prev.getOrDefault(name, 0L) - val delta = newValue - prevValue - if (delta > 0) { - incCounter(name, delta, labels) + metrics.asScala.foreach { case (name, metric) => + metric.metricType match { + case MetricType.Gauge => + addOrUpdateGaugeForApp(name, metricLabels, appId, metric.value) + case MetricType.Counter => + addOrUpdateCounterForApp(name, metricLabels, appId, metric.value) + } } - prev.put(name, newValue) } def removeApplicationMetrics(appId: String): Unit = { - val labels = Map(applicationLabel -> appId) - val gaugeCache = appGaugeCache.remove(appId) - if (gaugeCache != null) { - gaugeCache.keySet().asScala.foreach(name => removeGauge(name, labels)) - } - val counterPrev = appCounterPrev.remove(appId) - if (counterPrev != null) { - counterPrev.keySet().asScala.foreach(name => removeCounter(name, labels)) - } + removedAppIds.put(appId, System.currentTimeMillis()) + removeAppFromMetrics(appId) } } diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 73e3c0ac078..a0a1bfa89eb 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -480,7 +480,8 @@ private[celeborn] class Master( needCheckedWorkerList, requestId, shouldResponse, - clientMetrics) => + clientMetrics, + metricLabels) => logDebug(s"Received heartbeat from app $appId") checkAuth(context, appId) executeWithLeaderChecker( @@ -497,7 +498,8 @@ private[celeborn] class Master( needCheckedWorkerList, requestId, shouldResponse, - clientMetrics)) + clientMetrics, + metricLabels)) case pbRegisterWorker: PbRegisterWorker => val requestId = pbRegisterWorker.getRequestId @@ -1234,7 +1236,8 @@ private[celeborn] class Master( needCheckedWorkerList: util.List[WorkerInfo], requestId: String, shouldResponse: Boolean, - clientMetrics: util.Map[String, ClientMetric]): Unit = { + clientMetrics: util.Map[String, ClientMetric], + metricLabels: util.Map[String, String]): Unit = { statusSystem.handleAppHeartbeat( appId, totalWritten, @@ -1245,7 +1248,10 @@ private[celeborn] class Master( applicationFallbackCounts, System.currentTimeMillis(), requestId) - applicationMetricsSource.updateApplicationMetrics(appId, clientMetrics) + applicationMetricsSource.updateApplicationMetrics( + appId, + metricLabels.asScala.toMap, + clientMetrics) gaugeShuffleFallbackCounts() val unknownWorkers = needCheckedWorkerList.asScala.filterNot(w => statusSystem.workersMap.containsKey(w.toUniqueId)).asJava diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala index e8642f78fa1..c1c1af53f09 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala @@ -25,41 +25,206 @@ import org.apache.celeborn.common.metrics.{ClientMetric, MetricType} class ApplicationMetricsSourceSuite extends CelebornFunSuite { - private def metricsOf(app: String, value: Long): JHashMap[String, ClientMetric] = { + private def enabledConf(): CelebornConf = { + val c = new CelebornConf() + c.set(CelebornConf.MASTER_CLIENT_METRICS_ENABLED, true) + c + } + + private def gaugeMetrics(value: Long): JHashMap[String, ClientMetric] = { val map = new JHashMap[String, ClientMetric]() map.put("ClientRegisterShuffleCount", ClientMetric(value, MetricType.Gauge)) map } - test("client metrics are emitted as gauges labeled by applicationId") { + private def counterMetrics(value: Long): JHashMap[String, ClientMetric] = { + val map = new JHashMap[String, ClientMetric]() + map.put("ClientRegisterShuffleCount", ClientMetric(value, MetricType.Counter)) + map + } + + private def update( + source: ApplicationMetricsSource, + metrics: JHashMap[String, ClientMetric], + labels: Map[String, String] = Map.empty, + appId: String = "app-1"): Unit = + source.updateApplicationMetrics(appId, labels, metrics) + + private def gaugeValue( + source: ApplicationMetricsSource, + labels: Map[String, String], + name: String = "ClientRegisterShuffleCount"): Option[Long] = + source.gauges() + .find(g => g.name == name && hasLabels(g.labels, labels)) + .map(_.gauge.getValue.asInstanceOf[Number].longValue()) + + private def counterValue( + source: ApplicationMetricsSource, + labels: Map[String, String], + name: String = "ClientRegisterShuffleCount"): Option[Long] = + source.counters() + .find(c => c.name == name && hasLabels(c.labels, labels)) + .map(_.counter.getCount) + + private def hasLabels( + actual: Map[String, String], + expected: Map[String, String]): Boolean = + expected.forall { case (key, value) => actual.get(key).contains(value) } + + test("masterClientMetrics disabled: updateApplicationMetrics is a no-op") { val source = new ApplicationMetricsSource(new CelebornConf()) - source.updateApplicationMetrics("app-1", metricsOf("app-1", 3)) - source.updateApplicationMetrics("app-2", metricsOf("app-2", 7)) + update(source, gaugeMetrics(5), Map("team" -> "data-eng")) + + assert(source.gauges().isEmpty) + assert(source.counters().isEmpty) + } + + test("no custom labels: metrics are not reported") { + val source = new ApplicationMetricsSource(enabledConf()) + + update(source, gaugeMetrics(3)) + + assert(source.gauges().isEmpty) + assert(source.counters().isEmpty) + } + + test("client labels are used as metric labels") { + val source = new ApplicationMetricsSource(enabledConf()) + + update(source, gaugeMetrics(5), Map("team" -> "data-eng")) val metrics = source.getMetrics - assert(metrics.contains("metrics_ClientRegisterShuffleCount_Value")) - assert(metrics.contains("""applicationId="app-1"""")) - assert(metrics.contains("""applicationId="app-2"""")) - assert(metrics.contains("""role="Master"""")) + assert(metrics.contains("""team="data-eng"""")) } - test("latest reported value is reflected and removed on application lost") { - val source = new ApplicationMetricsSource(new CelebornConf()) + test("gauge is updated to the latest reported value") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") - source.updateApplicationMetrics("app-1", metricsOf("app-1", 1)) - source.updateApplicationMetrics("app-1", metricsOf("app-1", 42)) - assert(source.gauges().exists(g => - g.labels.get("applicationId").contains("app-1") && - g.gauge.getValue.asInstanceOf[Number].longValue() == 42L)) + update(source, gaugeMetrics(1), labels) + update(source, gaugeMetrics(42), labels) + + assert(gaugeValue(source, labels).contains(42L)) + } + + test("gauge for a label set is updated by whichever app heartbeats last") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, gaugeMetrics(3), labels, "app-1") + update(source, gaugeMetrics(7), labels, "app-2") + + assert(gaugeValue(source, labels).contains(7L)) + } + + test("counter accumulates deltas from heartbeats") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, counterMetrics(10), labels) + update(source, counterMetrics(15), labels) + + assert(counterValue(source, labels).contains(25L)) + } + + test("zero or negative counter delta is ignored") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, counterMetrics(10), labels) + update(source, counterMetrics(0), labels) + update(source, counterMetrics(-5), labels) + + assert(counterValue(source, labels).contains(10L)) + } + + test("counters from apps sharing a label set accumulate") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, counterMetrics(10), labels, "app-1") + update(source, counterMetrics(5), labels, "app-2") + + assert(counterValue(source, labels).contains(15L)) + } + + test("counter deltas accumulate across sequential heartbeats") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, counterMetrics(10), labels) + update(source, counterMetrics(20), labels) + update(source, counterMetrics(35), labels) + + assert(counterValue(source, labels).contains(65L)) + } + + test("counter labels appear in prometheus output") { + val source = new ApplicationMetricsSource(enabledConf()) + + update(source, counterMetrics(10), Map("team" -> "infra")) + + val metrics = source.getMetrics + assert(metrics.contains("""team="infra"""")) + } + + test("mixed gauge and counter in a single heartbeat") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + val map = new JHashMap[String, ClientMetric]() + map.put("ActiveShuffleCount", ClientMetric(3, MetricType.Gauge)) + map.put("RegisterShuffleCount", ClientMetric(10, MetricType.Counter)) + + source.updateApplicationMetrics("app-1", labels, map) + + assert(gaugeValue(source, labels, "ActiveShuffleCount").contains(3L)) + assert(counterValue(source, labels, "RegisterShuffleCount").contains(10L)) + } + + test("removing one app keeps metrics registered while another app still contributes") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, gaugeMetrics(3), labels, "app-1") + update(source, gaugeMetrics(7), labels, "app-2") + update(source, counterMetrics(10), labels, "app-1") + update(source, counterMetrics(5), labels, "app-2") source.removeApplicationMetrics("app-1") - assert(!source.gauges().exists(_.labels.get("applicationId").contains("app-1"))) + + assert(gaugeValue(source, labels).contains(7L)) + assert(counterValue(source, labels).contains(15L)) } - test("empty metrics map registers nothing") { - val source = new ApplicationMetricsSource(new CelebornConf()) - source.updateApplicationMetrics("app-1", new JHashMap[String, ClientMetric]()) - assert(source.gauges().isEmpty) + test("removing the last contributing app deregisters tracked metrics") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, gaugeMetrics(3), labels, "app-1") + update(source, gaugeMetrics(7), labels, "app-2") + update(source, counterMetrics(10), labels, "app-1") + update(source, counterMetrics(5), labels, "app-2") + + source.removeApplicationMetrics("app-1") + + assert(gaugeValue(source, labels).contains(7L)) + assert(counterValue(source, labels).contains(15L)) + + source.removeApplicationMetrics("app-2") + + assert(gaugeValue(source, labels).isEmpty) + assert(counterValue(source, labels).isEmpty) + } + + test("late heartbeats for removed apps are ignored") { + val source = new ApplicationMetricsSource(enabledConf()) + val labels = Map("team" -> "data-eng") + + update(source, gaugeMetrics(3), labels, "app-1") + source.removeApplicationMetrics("app-1") + update(source, gaugeMetrics(9), labels, "app-1") + + assert(gaugeValue(source, labels).isEmpty) } } From 20706270a00427bc8699d4b65824dc5373578ada Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Thu, 2 Jul 2026 09:27:45 +0000 Subject: [PATCH 5/6] Update --- docs/configuration/client.md | 2 +- docs/configuration/metrics.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 74b816cc025..6527b1ee842 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -50,7 +50,7 @@ license: | | celeborn.client.flink.resultPartition.supportFloatingBuffer | true | false | Whether to support floating buffer for result partitions. | 0.3.0 | remote-shuffle.job.support-floating-buffer-per-output-gate | | celeborn.client.flink.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use flink built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use flink built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.6.0 | | | celeborn.client.inputStream.creation.window | 16 | false | Window size that CelebornShuffleReader pre-creates CelebornInputStreams, for coalesced scenario where multiple Partitions are read | 0.5.1 | | -| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics re-exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | +| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | | celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | | | celeborn.client.partition.reader.checkpoint.enabled | false | false | Whether or not checkpoint reads when re-creating a partition reader. Setting to true minimizes the amount of unnecessary reads during partition read retries | 0.6.0 | | | celeborn.client.partition.reader.waitLog.threshold | 60s | false | The threshold in milliseconds for logging partition read wait time. Log messages will be generated when wait time exceeds multiples of this threshold. | 0.6.2 | | diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index 70fe432d190..ddacd586e8d 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -19,7 +19,7 @@ license: | | Key | Default | isDynamic | Description | Since | Deprecated | | --- | ------- | --------- | ----------- | ----- | ---------- | -| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics re-exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | +| celeborn.client.metrics.appLabels | | false | Custom metric labels sent from the client in each application heartbeat and applied to client metrics exposed on the master's Prometheus endpoint. Labels' pattern is: `=[,=]*`; e.g. `env=prod,version=1` | 0.7.0 | | | celeborn.client.metrics.enabled | false | false | When true, the LifecycleManager collects client-side metrics. Requires `celeborn.metrics.enabled` to also be true. | 0.7.0 | | | celeborn.metrics.capacity | 4096 | false | The maximum number of metrics which a source can use to generate output strings. | 0.2.0 | | | celeborn.metrics.collectPerfCritical.enabled | false | false | It controls whether to collect metrics which may affect performance. When enable, Celeborn collects them. | 0.2.0 | | @@ -30,7 +30,7 @@ license: | | celeborn.metrics.json.pretty.enabled | true | false | When true, view metrics in json pretty format | 0.4.0 | | | celeborn.metrics.loggerSink.output.enabled | false | false | Whether to output scraped metrics to the logger. This config will have effect if you enabled logger sink.If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | | | celeborn.metrics.loggerSink.scrape.interval | 30min | false | The interval of logger sink to scrape its own metrics. This config will have effect if you enabled logger sink. If you will not scrape metrics periodically, do add `org.apache.celeborn.common.metrics.sink.LoggerSink` to metrics.properties. | 0.6.0 | | -| celeborn.metrics.master.clientMetrics.enabled | false | false | When true, the master re-exposes client-side metrics forwarded in application heartbeats on its Prometheus endpoint. | 0.7.0 | | +| celeborn.metrics.master.clientMetrics.enabled | false | false | When true, the master exposes client-side metrics forwarded in application heartbeats on its Prometheus endpoint. | 0.7.0 | | | celeborn.metrics.master.clientMetrics.removedApp.retentionMs | 5min | false | How long to retain removed application IDs in the client metrics source to reject late heartbeats after an application is lost. Entries older than this are periodically evicted. | 0.7.0 | | | celeborn.metrics.prometheus.path | /metrics/prometheus | false | URI context path of prometheus metrics HTTP server. | 0.4.0 | | | celeborn.metrics.sample.rate | 1.0 | false | It controls if Celeborn collect timer metrics for some operations. Its value should be in [0.0, 1.0]. | 0.2.0 | | From fe1aa14acf1126c2ab63cf156784ca8d70fbf9e6 Mon Sep 17 00:00:00 2001 From: AmandeepSingh285 Date: Thu, 2 Jul 2026 12:43:39 +0000 Subject: [PATCH 6/6] Fix failing test --- .../celeborn/client/CelebornClientSourceSuite.scala | 8 +------- .../deploy/master/ApplicationMetricsSourceSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala b/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala index 5d06e309873..8f764839938 100644 --- a/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala +++ b/client/src/test/scala/org/apache/celeborn/client/CelebornClientSourceSuite.scala @@ -77,14 +77,8 @@ class CelebornClientSourceSuite extends CelebornFunSuite { val snapshot = source.getMetricsSnapshot() assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_COUNT)) + assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).value == 1) assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).metricType == MetricType.Counter) - assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT)) - assert(snapshot.contains(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT)) - assert(snapshot.contains(CelebornClientSource.REVIVE_REQUEST_COUNT)) - assert(snapshot.contains(CelebornClientSource.REVIVE_FAIL_COUNT)) - assert(snapshot.contains(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT)) - assert(snapshot.contains(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT)) - assert(snapshot.contains(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT)) assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).value == 7) assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).metricType == MetricType.Gauge) } diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala index c1c1af53f09..403e6b422f6 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/ApplicationMetricsSourceSuite.scala @@ -33,7 +33,7 @@ class ApplicationMetricsSourceSuite extends CelebornFunSuite { private def gaugeMetrics(value: Long): JHashMap[String, ClientMetric] = { val map = new JHashMap[String, ClientMetric]() - map.put("ClientRegisterShuffleCount", ClientMetric(value, MetricType.Gauge)) + map.put("ClientActiveShuffleCount", ClientMetric(value, MetricType.Gauge)) map } @@ -53,7 +53,7 @@ class ApplicationMetricsSourceSuite extends CelebornFunSuite { private def gaugeValue( source: ApplicationMetricsSource, labels: Map[String, String], - name: String = "ClientRegisterShuffleCount"): Option[Long] = + name: String = "ClientActiveShuffleCount"): Option[Long] = source.gauges() .find(g => g.name == name && hasLabels(g.labels, labels)) .map(_.gauge.getValue.asInstanceOf[Number].longValue())