Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.client.MasterClient
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.metrics.ClientMetric
import org.apache.celeborn.common.protocol.PbReviseLostShufflesResponse
import org.apache.celeborn.common.protocol.message.ControlMessages.{ApplicationLost, ApplicationLostResponse, CheckQuotaResponse, HeartbeatFromApplication, HeartbeatFromApplicationResponse, ReviseLostShuffles, ZERO_UUID}
import org.apache.celeborn.common.protocol.message.StatusCode
Expand All @@ -42,10 +43,14 @@ class ApplicationHeartbeater(
(Long, Long, Map[String, java.lang.Long], Map[String, java.lang.Long])),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
cancelAllActiveStages: String => Unit,
clientMetrics: () => util.Map[String, ClientMetric] =
() => new util.HashMap[String, ClientMetric]()) extends Logging {

private var stopped = false
private val reviseLostShuffles = conf.reviseLostShufflesEnabled
private val appMetricLabels: util.Map[String, String] =
conf.clientMetricsAppLabels.asJava

// Use independent app heartbeat threads to avoid being blocked by other operations.
private val appHeartbeatIntervalMs = conf.appHeartbeatIntervalMs
Expand Down Expand Up @@ -85,7 +90,10 @@ class ApplicationHeartbeater(
tmpApplicationFallbackCounts.asJava,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
true,
if (appMetricLabels.isEmpty) new util.HashMap[String, ClientMetric]()
else clientMetrics(),
appMetricLabels)
val response = requestHeartbeat(appHeartbeat)
if (response.statusCode == StatusCode.SUCCESS) {
logDebug("Successfully send app heartbeat.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.client

import java.util.concurrent.ConcurrentHashMap

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.{ClientMetric, MetricType}
import org.apache.celeborn.common.metrics.source.{AbstractSource, Role}

/**
* Metrics source for the Celeborn client
*/
class CelebornClientSource(conf: CelebornConf) extends AbstractSource(conf, Role.CLIENT) {
override val sourceName = "client"

import CelebornClientSource._

// Tracks previous counter values so we can send deltas to the master.
private val counterPrev = new ConcurrentHashMap[String, java.lang.Long]()

addCounter(REGISTER_SHUFFLE_COUNT)
addCounter(REGISTER_SHUFFLE_FAIL_COUNT)
addCounter(UNREGISTER_SHUFFLE_COUNT)
addCounter(REVIVE_REQUEST_COUNT)
addCounter(REVIVE_FAIL_COUNT)
addCounter(SLOT_RESERVATION_FAIL_COUNT)
addCounter(SHUFFLE_FETCH_FAILURE_COUNT)
addCounter(SHUFFLE_DATA_LOST_COUNT)

def getMetricsSnapshot(): Map[String, ClientMetric] = {
// Counters: compute delta since last snapshot
val counterMetrics = counters().flatMap { c =>
val current = c.counter.getCount
val prev = Option(counterPrev.put(c.name, current)).map(_.longValue()).getOrElse(0L)
val delta = current - prev
if (delta > 0) Some(c.name -> ClientMetric(delta, MetricType.Counter))
else None
}
// Gauges: send the latest value as-is.
val gaugeMetrics = gauges().map(g =>
g.name -> ClientMetric(g.gauge.getValue.asInstanceOf[Number].longValue(), MetricType.Gauge))
(counterMetrics ++ gaugeMetrics).toMap
}

def start(): Unit = startCleaner()

def stop(): Unit = metricsCleaner.shutdown()
}

object CelebornClientSource {
val EXCLUDED_WORKER_COUNT = "ClientExcludedWorkerCount"
val SHUTTING_WORKER_COUNT = "ClientShuttingWorkerCount"
val ACTIVE_SHUFFLE_COUNT = "ClientActiveShuffleCount"
val REGISTER_SHUFFLE_COUNT = "ClientRegisterShuffleCount"
val REGISTER_SHUFFLE_FAIL_COUNT = "ClientRegisterShuffleFailCount"
val UNREGISTER_SHUFFLE_COUNT = "ClientUnregisterShuffleCount"
val REVIVE_REQUEST_COUNT = "ClientReviveRequestCount"
val REVIVE_FAIL_COUNT = "ClientReviveFailCount"
val SLOT_RESERVATION_FAIL_COUNT = "ClientSlotReservationFailCount"
val SHUFFLE_FETCH_FAILURE_COUNT = "ClientShuffleFetchFailureCount"
val SHUFFLE_DATA_LOST_COUNT = "ClientShuffleDataLostCount"
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,9 @@ class ChangePartitionManager(
None,
lifecycleManager.workerStatusTracker.workerAvailableByLocation(req.oldPartition))))
}
lifecycleManager.incClientMetric(
CelebornClientSource.REVIVE_FAIL_COUNT,
changePartitions.size)
}

val candidates = new util.HashSet[WorkerInfo]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.celeborn.common.client.{ApplicationInfoProvider, MasterClient}
import org.apache.celeborn.common.identity.{IdentityProvider, UserIdentifier}
import org.apache.celeborn.common.internal.Logging
import org.apache.celeborn.common.meta.{ApplicationMeta, ShufflePartitionLocationInfo, WorkerInfo}
import org.apache.celeborn.common.metrics.ClientMetric
import org.apache.celeborn.common.metrics.source.Role
import org.apache.celeborn.common.network.protocol.{SerdeVersion, TransportMessagesHelper}
import org.apache.celeborn.common.network.sasl.registration.RegistrationInfo
Expand Down Expand Up @@ -222,8 +223,26 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}

private val masterClient = new MasterClient(masterRpcEnvInUse, conf, false)
private val clientMetricsEnabled = conf.metricsSystemEnable && conf.clientMetricsEnabled
val clientSource: Option[CelebornClientSource] =
if (clientMetricsEnabled) Some(new CelebornClientSource(conf)) else None

@inline private[client] def incClientMetric(name: String, delta: Long = 1L): Unit =
clientSource.foreach(_.incCounter(name, delta))
val commitManager = new CommitManager(appUniqueId, conf, this)
val workerStatusTracker = new WorkerStatusTracker(conf, this)
clientSource.foreach { source =>
source.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () =>
registeredShuffle.size
}
source.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () =>
workerStatusTracker.excludedWorkers.size
}
source.addGauge(CelebornClientSource.SHUTTING_WORKER_COUNT) { () =>
workerStatusTracker.shuttingWorkers.size
}
source.start()
}
private val heartbeater =
new ApplicationHeartbeater(
appUniqueId,
Expand All @@ -236,7 +255,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
},
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
reason => cancelAllActiveStages(reason),
() =>
clientSource.map(_.getMetricsSnapshot().asJava)
.getOrElse(new util.HashMap[String, ClientMetric]()))
private def resetFallbackCounts(counts: ConcurrentHashMap[String, java.lang.Long])
: Map[String, java.lang.Long] = {
val fallbackCounts = new util.HashMap[String, java.lang.Long]()
Expand Down Expand Up @@ -727,6 +749,9 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends

// Reply to all RegisterShuffle request for current shuffle id.
def replyRegisterShuffle(response: RegisterShuffleResponse): Unit = {
incClientMetric(
if (response.status == StatusCode.SUCCESS) CelebornClientSource.REGISTER_SHUFFLE_COUNT
else CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT)
registeringShuffleRequest.synchronized {
val serializedMsg: Option[ByteBuffer] = partitionType match {
case PartitionType.REDUCE =>
Expand Down Expand Up @@ -888,9 +913,11 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
serdeVersion: SerdeVersion): Unit = {
val contextWrapper =
ChangeLocationsCallContext(context, partitionIds.size(), serdeVersion)
incClientMetric(CelebornClientSource.REVIVE_REQUEST_COUNT, partitionIds.size())
// If shuffle not registered, reply ShuffleNotRegistered and return
if (!registeredShuffle.contains(shuffleId)) {
logError(s"[handleRevive] shuffle $shuffleId not registered!")
incClientMetric(CelebornClientSource.REVIVE_FAIL_COUNT, partitionIds.size())
contextWrapper.reply(
-1,
StatusCode.SHUFFLE_UNREGISTERED,
Expand All @@ -902,6 +929,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
s"[handleRevive] shuffle $shuffleId, $mapIds, $partitionIds, $oldEpochs, $oldPartitions, $causes")
if (commitManager.isStageEnd(shuffleId)) {
logError(s"[handleRevive] shuffle $shuffleId stage ended!")
incClientMetric(CelebornClientSource.REVIVE_FAIL_COUNT, partitionIds.size())
contextWrapper.reply(
-1,
StatusCode.STAGE_ENDED,
Expand Down Expand Up @@ -1115,6 +1143,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (invokeReportTaskShuffleFetchFailurePreCheck(taskId)) {
logInfo(s"handle fetch failure for appShuffleId $appShuffleId shuffleId $shuffleId")
ret = invokeAppShuffleTrackerCallback(appShuffleId)
if (ret) incClientMetric(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT)
shuffleIds.put(appShuffleIdentifier, (shuffleId, false))
} else {
logInfo(
Expand Down Expand Up @@ -1246,6 +1275,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
context.reply(MapperEndResponse(StatusCode.SUCCESS, serdeVersion))
case false =>
logError(s"Failed $message, reply ${StatusCode.SHUFFLE_DATA_LOST}.")
incClientMetric(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT)
context.reply(MapperEndResponse(StatusCode.SHUFFLE_DATA_LOST, serdeVersion))
}
}
Expand Down Expand Up @@ -1611,6 +1641,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// [[releasePartitionLocation]]. Now in the slots are all the successful partition
// locations.
logWarning(s"Reserve buffers for $shuffleId still fail after retrying, clear buffers.")
incClientMetric(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT)
destroySlotsWithRetry(shuffleId, slots)
} else {
logInfo(s"Reserve buffer success for shuffleId $shuffleId")
Expand Down Expand Up @@ -1830,6 +1861,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
// if unregister shuffle not success, wait next turn
if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
unregisterShuffleTime.remove(shuffleId)
incClientMetric(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT)
}
}
} else {
Expand All @@ -1841,6 +1873,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
if (StatusCode.SUCCESS == StatusCode.fromValue(unregisterShuffleResponse.getStatus)) {
shuffleIdsToRemove.foreach { shuffleId: Integer =>
unregisterShuffleTime.remove(shuffleId)
incClientMetric(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT)
}
}
}
Expand Down Expand Up @@ -2050,6 +2083,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
*/
override def stop(): Unit = {
heartbeater.stop()
clientSource.foreach(_.stop())
super.stop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.google.common.base.Preconditions.checkState
import com.google.common.cache.{Cache, CacheBuilder}
import com.google.common.collect.Sets

import org.apache.celeborn.client.{ClientUtils, LifecycleManager, ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.{CelebornClientSource, ClientUtils, LifecycleManager, ShuffleCommittedInfo, WorkerStatusTracker}
import org.apache.celeborn.client.CommitManager.CommittedPartitionInfo
import org.apache.celeborn.client.LifecycleManager.{ShuffleAllocatedWorkers, ShuffleFailedWorkers}
import org.apache.celeborn.common.{CelebornConf, CommitMetadata}
Expand Down Expand Up @@ -222,6 +222,7 @@ class ReducePartitionCommitHandler(
} else {
logError(s"Failed to handle stageEnd for $shuffleId, lost file!")
dataLostShuffleSet.add(shuffleId)
lifecycleManager.incClientMetric(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT)
// record in stageEndShuffleSet
setStageEnd(shuffleId)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.client

import java.util.concurrent.atomic.AtomicInteger

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.metrics.MetricType

class CelebornClientSourceSuite extends CelebornFunSuite {

test("counters are declared, increment, and emit with role=Client") {
val source = new CelebornClientSource(new CelebornConf())

source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT)
source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT)
source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT)
source.incCounter(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT, 3)
source.incCounter(CelebornClientSource.REVIVE_REQUEST_COUNT, 5)
source.incCounter(CelebornClientSource.REVIVE_FAIL_COUNT, 2)
source.incCounter(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT)
source.incCounter(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT)
source.incCounter(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT)

val metrics = source.getMetrics
assert(metrics.contains("""metrics_ClientRegisterShuffleCount_Count"""))
assert(metrics.contains("""role="Client""""))

val snapshot = source.getMetricsSnapshot()
assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).value == 2)
assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).metricType == MetricType.Counter)
assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT).value == 1)
assert(snapshot(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT).value == 3)
assert(snapshot(CelebornClientSource.REVIVE_REQUEST_COUNT).value == 5)
assert(snapshot(CelebornClientSource.REVIVE_FAIL_COUNT).value == 2)
assert(snapshot(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT).value == 1)
assert(snapshot(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT).value == 1)
assert(snapshot(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT).value == 1)
}

test("gauges registered on the source are reflected in metrics and snapshot") {
val source = new CelebornClientSource(new CelebornConf())
val excluded = new AtomicInteger(0)

source.addGauge(CelebornClientSource.EXCLUDED_WORKER_COUNT) { () => excluded.get() }

assert(source.getMetricsSnapshot()(CelebornClientSource.EXCLUDED_WORKER_COUNT).value == 0)

excluded.set(5)
val metrics = source.getMetrics
assert(metrics.contains("metrics_ClientExcludedWorkerCount_Value"))
val snapshot = source.getMetricsSnapshot()
assert(snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).value == 5)
assert(snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).metricType == MetricType.Gauge)
}

test("getMetricsSnapshot includes both counters and gauges with correct types") {
val source = new CelebornClientSource(new CelebornConf())
source.addGauge(CelebornClientSource.ACTIVE_SHUFFLE_COUNT) { () => 7 }
source.incCounter(CelebornClientSource.REGISTER_SHUFFLE_COUNT)

val snapshot = source.getMetricsSnapshot()
assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_COUNT))
assert(snapshot(CelebornClientSource.REGISTER_SHUFFLE_COUNT).metricType == MetricType.Counter)
assert(snapshot.contains(CelebornClientSource.REGISTER_SHUFFLE_FAIL_COUNT))
assert(snapshot.contains(CelebornClientSource.UNREGISTER_SHUFFLE_COUNT))
assert(snapshot.contains(CelebornClientSource.REVIVE_REQUEST_COUNT))
assert(snapshot.contains(CelebornClientSource.REVIVE_FAIL_COUNT))
assert(snapshot.contains(CelebornClientSource.SLOT_RESERVATION_FAIL_COUNT))
assert(snapshot.contains(CelebornClientSource.SHUFFLE_FETCH_FAILURE_COUNT))
assert(snapshot.contains(CelebornClientSource.SHUFFLE_DATA_LOST_COUNT))
assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).value == 7)
assert(snapshot(CelebornClientSource.ACTIVE_SHUFFLE_COUNT).metricType == MetricType.Gauge)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import org.junit.Assert

import org.apache.celeborn.CelebornFunSuite
import org.apache.celeborn.client.LifecycleManager.ShuffleFailedWorkers
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{CLIENT_EXCLUDED_WORKER_EXPIRE_TIMEOUT, CLIENT_SHUFFLE_DYNAMIC_RESOURCE_ENABLED}
import org.apache.celeborn.common.meta.WorkerInfo
Expand Down Expand Up @@ -159,6 +160,31 @@ class WorkerStatusTrackerSuite extends CelebornFunSuite {
errors.get())
}

test("recordWorkerFailure increments client worker-excluded counter and gauge") {
val celebornConf = new CelebornConf()
celebornConf.set(CelebornConf.METRICS_ENABLED.key, "true")
celebornConf.set(CelebornConf.CLIENT_METRICS_ENABLED.key, "true")
val lifecycleManager = new LifecycleManager("app-metrics-test", celebornConf)
try {
val statusTracker = lifecycleManager.workerStatusTracker
val source = lifecycleManager.clientSource.get

val failed = new ShuffleFailedWorkers()
val now = System.currentTimeMillis()
failed.put(mock("host1"), (StatusCode.WORKER_UNRESPONSIVE, now))
failed.put(mock("host2"), (StatusCode.WORKER_UNRESPONSIVE, now))
statusTracker.recordWorkerFailure(failed)

val snapshot = source.getMetricsSnapshot()
Assert.assertEquals(2L, snapshot(CelebornClientSource.EXCLUDED_WORKER_COUNT).value)

// re-recording already-excluded workers does not change the gauge
statusTracker.recordWorkerFailure(failed)
} finally {
lifecycleManager.stop()
}
}

private def buildResponse(
excludedWorkerHosts: Array[String],
unknownWorkerHosts: Array[String],
Expand Down
Loading
Loading