From a1709d15b5b7679d9ebcaf1b3582353a5e2d8f13 Mon Sep 17 00:00:00 2001 From: afterincomparableyum <224495379+afterincomparableyum@users.noreply.github.com> Date: Sat, 13 Jun 2026 10:42:01 -0500 Subject: [PATCH] [CELEBORN-2359] Deduplicate # HELP/# TYPE lines in PrometheusServlet output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AbstractSource emits a `# HELP` and `# TYPE` line for every metric sample. When a metric family has multiple label sets — like ActiveConnectionCount (per application), RequestSlotsFailedCount (per status code), per-user/app resource consumption, or FlushWorkingQueueSize (per mountpoint) — the same family is exported with repeated `# HELP` and `# TYPE` lines. This violates the Prometheus exposition format. The fix is to override PrometheusServlet.getMetricsSnapshot to keep only the first `# HELP` and `# TYPE` line per metric family while preserving all sample lines, producing valid Prometheus output. I also added a PrometheusServletSuite covering a multi-label-set counter and a distinct gauge. --- .../metrics/sink/PrometheusServlet.scala | 19 +++++ .../metrics/sink/PrometheusServletSuite.scala | 69 +++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 service/src/test/scala/org/apache/celeborn/common/metrics/sink/PrometheusServletSuite.scala diff --git a/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala index 27797c8fc98..40eace3b7bf 100644 --- a/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala +++ b/service/src/main/scala/org/apache/celeborn/common/metrics/sink/PrometheusServlet.scala @@ -37,4 +37,23 @@ class PrometheusServlet( servletPath, new ServletParams(_ => getMetricsSnapshot, "text/plain")) } + + override def getMetricsSnapshot: String = { + val raw = super.getMetricsSnapshot + val seen = scala.collection.mutable.HashSet[String]() + val header = "^#\\s+(TYPE|HELP)\\s+(\\S+)\\b.*$".r + val out = new StringBuilder(raw.length) + + raw.linesIterator.foreach { + case line @ header(kind, metric) => + val key = s"$kind\t$metric" + if (!seen.contains(key)) { + out.append(line).append('\n') + seen += key + } + case line => + out.append(line).append('\n') + } + out.result() + } } diff --git a/service/src/test/scala/org/apache/celeborn/common/metrics/sink/PrometheusServletSuite.scala b/service/src/test/scala/org/apache/celeborn/common/metrics/sink/PrometheusServletSuite.scala new file mode 100644 index 00000000000..8ed7da149d8 --- /dev/null +++ b/service/src/test/scala/org/apache/celeborn/common/metrics/sink/PrometheusServletSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.common.metrics.sink + +import java.util.Properties + +import com.codahale.metrics.MetricRegistry + +import org.apache.celeborn.CelebornFunSuite +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.metrics.source.{AbstractSource, Role} + +class PrometheusServletSuite extends CelebornFunSuite { + + private def newServlet(source: AbstractSource): PrometheusServlet = { + new PrometheusServlet( + new Properties(), + new MetricRegistry(), + Seq(source), + "/metrics/prometheus") + } + + test("getMetricsSnapshot deduplicates # HELP and # TYPE lines per metric family") { + val conf = new CelebornConf() + val source = new AbstractSource(conf, Role.WORKER) { + override def sourceName: String = "mockSource" + } + + val user1 = Map("user" -> "user1") + val user2 = Map("user" -> "user2") + source.addCounter("Counter", user1) + source.addCounter("Counter", user2) + source.incCounter("Counter", 1, user1) + source.incCounter("Counter", 2, user2) + + source.addGauge("Gauge1") { () => 1000 } + + val snapshot = newServlet(source).getMetricsSnapshot + val lines = snapshot.linesIterator.toList + + val counterName = "metrics_Counter_Count" + val gaugeName = "metrics_Gauge1_Value" + + assert(lines.count(_ == s"# HELP $counterName") == 1) + assert(lines.count(_ == s"# TYPE $counterName counter") == 1) + assert(lines.exists(_ == s"# TYPE $counterName counter")) + assert(lines.count(_.startsWith(s"$counterName{")) == 2) + assert(snapshot.contains("""user="user1"""")) + assert(snapshot.contains("""user="user2"""")) + assert(lines.count(_ == s"# HELP $gaugeName") == 1) + assert(lines.count(_ == s"# TYPE $gaugeName gauge") == 1) + assert(lines.exists(_.startsWith(s"$gaugeName")) && snapshot.contains("1000")) + } +}