Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.broker.metrics;

import io.opentelemetry.api.common.AttributeKey;

public class BrokerMetricsConstant {
public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";

Expand Down Expand Up @@ -64,4 +66,21 @@ public class BrokerMetricsConstant {
public static final String LABEL_LANGUAGE = "language";
public static final String LABEL_VERSION = "version";
public static final String LABEL_CONSUME_MODE = "consume_mode";

// Pre-built typed AttributeKey singletons. Use these in AttributesBuilder.put()
// on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per call.
Comment on lines +70 to +71
public static final AttributeKey<String> LABEL_CLUSTER_NAME_KEY = AttributeKey.stringKey(LABEL_CLUSTER_NAME);
public static final AttributeKey<String> LABEL_NODE_TYPE_KEY = AttributeKey.stringKey(LABEL_NODE_TYPE);
public static final AttributeKey<String> LABEL_NODE_ID_KEY = AttributeKey.stringKey(LABEL_NODE_ID);
public static final AttributeKey<String> LABEL_AGGREGATION_KEY = AttributeKey.stringKey(LABEL_AGGREGATION);
public static final AttributeKey<String> LABEL_PROCESSOR_KEY = AttributeKey.stringKey(LABEL_PROCESSOR);
public static final AttributeKey<String> LABEL_TOPIC_KEY = AttributeKey.stringKey(LABEL_TOPIC);
public static final AttributeKey<String> LABEL_INVOCATION_STATUS_KEY = AttributeKey.stringKey(LABEL_INVOCATION_STATUS);
public static final AttributeKey<Boolean> LABEL_IS_RETRY_KEY = AttributeKey.booleanKey(LABEL_IS_RETRY);
public static final AttributeKey<Boolean> LABEL_IS_SYSTEM_KEY = AttributeKey.booleanKey(LABEL_IS_SYSTEM);
public static final AttributeKey<String> LABEL_CONSUMER_GROUP_KEY = AttributeKey.stringKey(LABEL_CONSUMER_GROUP);
public static final AttributeKey<String> LABEL_MESSAGE_TYPE_KEY = AttributeKey.stringKey(LABEL_MESSAGE_TYPE);
public static final AttributeKey<String> LABEL_LANGUAGE_KEY = AttributeKey.stringKey(LABEL_LANGUAGE);
public static final AttributeKey<String> LABEL_VERSION_KEY = AttributeKey.stringKey(LABEL_VERSION);
public static final AttributeKey<String> LABEL_CONSUME_MODE_KEY = AttributeKey.stringKey(LABEL_CONSUME_MODE);
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,19 @@
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;

public class BrokerMetricsManager {
Expand Down Expand Up @@ -197,10 +197,10 @@ public AttributesBuilder newAttributesBuilder() {

private Attributes buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult result) {
AttributesBuilder attributesBuilder = newAttributesBuilder();
attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
attributesBuilder.put(LABEL_TOPIC, result.topic);
attributesBuilder.put(LABEL_IS_RETRY, result.isRetry);
attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic, result.group));
attributesBuilder.put(LABEL_CONSUMER_GROUP_KEY, result.group);
attributesBuilder.put(LABEL_TOPIC_KEY, result.topic);
attributesBuilder.put(LABEL_IS_RETRY_KEY, result.isRetry);
attributesBuilder.put(LABEL_IS_SYSTEM_KEY, isSystem(result.topic, result.group));
return attributesBuilder.build();
}

Expand Down Expand Up @@ -571,18 +571,18 @@ private void initStatsMetrics() {
.setDescription("Request processor watermark")
.ofLongs()
.buildWithCallback(measurement -> {
measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "send").build());
measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "async_put").build());
measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "pull").build());
measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "ack").build());
measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "query_message").build());
measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "client_manager").build());
measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "heartbeat").build());
measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "lite_pull").build());
measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "transaction").build());
measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "consumer_manager").build());
measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "admin").build());
measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "reply").build());
measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "send").build());
measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "async_put").build());
measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "pull").build());
measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "ack").build());
measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "query_message").build());
measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "client_manager").build());
measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "heartbeat").build());
measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "lite_pull").build());
measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "transaction").build());
measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "consumer_manager").build());
measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "admin").build());
measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "reply").build());
});

brokerPermission = brokerMeter.gaugeBuilder(GAUGE_BROKER_PERMISSION)
Expand Down Expand Up @@ -662,9 +662,9 @@ private void initConnectionMetrics() {
});
metricsMap.forEach((attr, count) -> {
Attributes attributes = newAttributesBuilder()
.put(LABEL_LANGUAGE, attr.language.name().toLowerCase())
.put(LABEL_VERSION, MQVersion.getVersionDesc(attr.version).toLowerCase())
.put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING)
.put(LABEL_LANGUAGE_KEY, attr.language.name().toLowerCase())
.put(LABEL_VERSION_KEY, MQVersion.getVersionDesc(attr.version).toLowerCase())
.put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING)
.build();
measurement.record(count, attributes);
});
Expand All @@ -688,12 +688,12 @@ private void initConnectionMetrics() {
});
metricsMap.forEach((attr, count) -> {
Attributes attributes = newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, attr.group)
.put(LABEL_LANGUAGE, attr.language.name().toLowerCase())
.put(LABEL_VERSION, MQVersion.getVersionDesc(attr.version).toLowerCase())
.put(LABEL_CONSUME_MODE, attr.consumeMode.getTypeCN().toLowerCase())
.put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING)
.put(LABEL_IS_SYSTEM, isSystemGroup(attr.group))
.put(LABEL_CONSUMER_GROUP_KEY, attr.group)
.put(LABEL_LANGUAGE_KEY, attr.language.name().toLowerCase())
.put(LABEL_VERSION_KEY, MQVersion.getVersionDesc(attr.version).toLowerCase())
.put(LABEL_CONSUME_MODE_KEY, attr.consumeMode.getTypeCN().toLowerCase())
.put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING)
.put(LABEL_IS_SYSTEM_KEY, isSystemGroup(attr.group))
.build();
measurement.record(count, attributes);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY;
import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY;
import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL;
import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL;
import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL;
Expand Down Expand Up @@ -181,8 +181,8 @@ public void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus st
public void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType,
PutMessageStatus status, int num) {
Attributes attributes = this.newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, group)
.put(LABEL_TOPIC, topic)
.put(LABEL_CONSUMER_GROUP_KEY, group)
.put(LABEL_TOPIC_KEY, topic)
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
.put(LABEL_PUT_STATUS, status.name())
.build();
Expand All @@ -201,8 +201,8 @@ public void incPopReviveGetCount(String group, String topic, PopReviveMessageTyp
int num) {
AttributesBuilder builder = this.newAttributesBuilder();
Attributes attributes = builder
.put(LABEL_CONSUMER_GROUP, group)
.put(LABEL_TOPIC, topic)
.put(LABEL_CONSUMER_GROUP_KEY, group)
.put(LABEL_TOPIC_KEY, topic)
.put(LABEL_QUEUE_ID, queueId)
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
.build();
Expand All @@ -212,8 +212,8 @@ public void incPopReviveGetCount(String group, String topic, PopReviveMessageTyp
public void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) {
AttributesBuilder builder = this.newAttributesBuilder();
Attributes attributes = builder
.put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
.put(LABEL_TOPIC, checkPoint.getTopic())
.put(LABEL_CONSUMER_GROUP_KEY, checkPoint.getCId())
.put(LABEL_TOPIC_KEY, checkPoint.getTopic())
.put(LABEL_PUT_STATUS, status.name())
.build();
this.popReviveRetryMessageTotal.add(1, attributes);
Expand Down
2 changes: 2 additions & 0 deletions broker/src/main/resources/rmq.broker.logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

<configuration scan="true" scanPeriod="30 seconds">

<statusListener class="ch.qos.logback.core.status.NopStatusListener"/>

<appender name="DefaultSiftingAppender_inner" class="ch.qos.logback.classic.sift.SiftingAppender">
<discriminator>
<key>brokerContainerLogDir</key>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ public enum TopicMessageType {
MIXED("MIXED");

private final String value;
private final String metricsValue;

TopicMessageType(String value) {
this.value = value;
this.metricsValue = value.toLowerCase(java.util.Locale.ROOT);
}

public static Set<String> topicMessageTypeSet() {
Expand Down Expand Up @@ -67,6 +69,6 @@ public static TopicMessageType parseFromMessageProperty(Map<String, String> mess
}

public String getMetricsValue() {
return value.toLowerCase();
return metricsValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,12 @@ public static CompletableFuture<Void> convertChannelFutureToCompletableFuture(Ch
}

public static String getRequestCodeDesc(int code) {
return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
String desc = REQUEST_CODE_MAP.get(code);
return desc != null ? desc : String.valueOf(code);
}

public static String getResponseCodeDesc(int code) {
return RESPONSE_CODE_MAP.getOrDefault(code, String.valueOf(code));
String desc = RESPONSE_CODE_MAP.get(code);
return desc != null ? desc : String.valueOf(code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.remoting.metrics;

import io.opentelemetry.api.common.AttributeKey;

public class RemotingMetricsConstant {
public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
Expand All @@ -24,6 +26,14 @@ public class RemotingMetricsConstant {
public static final String LABEL_IS_LONG_POLLING = "is_long_polling";
public static final String LABEL_RESULT = "result";

// Pre-built typed AttributeKey singletons. Use these in AttributesBuilder.put()
// on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per call.
Comment on lines +29 to +30
public static final AttributeKey<String> LABEL_PROTOCOL_TYPE_KEY = AttributeKey.stringKey(LABEL_PROTOCOL_TYPE);
public static final AttributeKey<String> LABEL_REQUEST_CODE_KEY = AttributeKey.stringKey(LABEL_REQUEST_CODE);
public static final AttributeKey<String> LABEL_RESPONSE_CODE_KEY = AttributeKey.stringKey(LABEL_RESPONSE_CODE);
public static final AttributeKey<Boolean> LABEL_IS_LONG_POLLING_KEY = AttributeKey.booleanKey(LABEL_IS_LONG_POLLING);
public static final AttributeKey<String> LABEL_RESULT_KEY = AttributeKey.stringKey(LABEL_RESULT);

public static final String PROTOCOL_TYPE_REMOTING = "remoting";

public static final String RESULT_ONEWAY = "oneway";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
import org.apache.rocketmq.common.metrics.NopLongHistogram;

import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;

import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
Expand All @@ -53,7 +54,7 @@ public AttributesBuilder newAttributesBuilder() {
return Attributes.builder();
}
return this.attributesBuilderSupplier.get()
.put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING);
.put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING);
}

public void initMetrics(Meter meter, Supplier<AttributesBuilder> attributesBuilderSupplier) {
Expand Down
Loading