diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java index 4b319f12f6f..e87ce9ad02d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.broker.metrics; +import io.opentelemetry.api.common.AttributeKey; + public class BrokerMetricsConstant { public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter"; @@ -64,4 +66,21 @@ public class BrokerMetricsConstant { public static final String LABEL_LANGUAGE = "language"; public static final String LABEL_VERSION = "version"; public static final String LABEL_CONSUME_MODE = "consume_mode"; + + // Pre-built typed AttributeKey singletons. Use these in AttributesBuilder.put() + // on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per call. + public static final AttributeKey LABEL_CLUSTER_NAME_KEY = AttributeKey.stringKey(LABEL_CLUSTER_NAME); + public static final AttributeKey LABEL_NODE_TYPE_KEY = AttributeKey.stringKey(LABEL_NODE_TYPE); + public static final AttributeKey LABEL_NODE_ID_KEY = AttributeKey.stringKey(LABEL_NODE_ID); + public static final AttributeKey LABEL_AGGREGATION_KEY = AttributeKey.stringKey(LABEL_AGGREGATION); + public static final AttributeKey LABEL_PROCESSOR_KEY = AttributeKey.stringKey(LABEL_PROCESSOR); + public static final AttributeKey LABEL_TOPIC_KEY = AttributeKey.stringKey(LABEL_TOPIC); + public static final AttributeKey LABEL_INVOCATION_STATUS_KEY = AttributeKey.stringKey(LABEL_INVOCATION_STATUS); + public static final AttributeKey LABEL_IS_RETRY_KEY = AttributeKey.booleanKey(LABEL_IS_RETRY); + public static final AttributeKey LABEL_IS_SYSTEM_KEY = AttributeKey.booleanKey(LABEL_IS_SYSTEM); + public static final AttributeKey LABEL_CONSUMER_GROUP_KEY = AttributeKey.stringKey(LABEL_CONSUMER_GROUP); + public static final AttributeKey LABEL_MESSAGE_TYPE_KEY = AttributeKey.stringKey(LABEL_MESSAGE_TYPE); + public static final AttributeKey LABEL_LANGUAGE_KEY = AttributeKey.stringKey(LABEL_LANGUAGE); + public static final AttributeKey LABEL_VERSION_KEY = AttributeKey.stringKey(LABEL_VERSION); + public static final AttributeKey LABEL_CONSUME_MODE_KEY = AttributeKey.stringKey(LABEL_CONSUME_MODE); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 835e9e98576..78aed06dbb7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -72,6 +72,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -101,19 +102,20 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_MESSAGE_TYPE_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION_KEY; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING; public class BrokerMetricsManager { @@ -164,6 +166,12 @@ public class BrokerMetricsManager { private LongCounter rollBackMessagesTotal = new NopLongCounter(); private LongHistogram transactionFinishLatency = new NopLongHistogram(); + private final ConcurrentHashMap topicAttributesCache = new ConcurrentHashMap<>(); + private volatile String lastTopicName; + private volatile String lastTopicMsgType; + private volatile boolean lastTopicIsSystem; + private volatile Attributes lastTopicAttributes; + private final RemotingMetricsManager remotingMetricsManager; private final PopMetricsManager popMetricsManager; @@ -195,12 +203,32 @@ public AttributesBuilder newAttributesBuilder() { return attributesBuilder; } + public Attributes getOrBuildTopicAttributes(String topic, String messageType, boolean isSystem) { + Attributes lastAttrs = this.lastTopicAttributes; + if (lastAttrs != null && isSystem == this.lastTopicIsSystem && topic.equals(this.lastTopicName) && messageType.equals(this.lastTopicMsgType)) { + return lastAttrs; + } + String cacheKey = topic + '|' + messageType + '|' + isSystem; + Attributes attrs = topicAttributesCache.computeIfAbsent(cacheKey, k -> + newAttributesBuilder() + .put(LABEL_TOPIC_KEY, topic) + .put(LABEL_MESSAGE_TYPE_KEY, messageType) + .put(LABEL_IS_SYSTEM_KEY, isSystem) + .build() + ); + this.lastTopicName = topic; + this.lastTopicMsgType = messageType; + this.lastTopicIsSystem = isSystem; + this.lastTopicAttributes = attrs; + return attrs; + } + private Attributes buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult result) { AttributesBuilder attributesBuilder = newAttributesBuilder(); - attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group); - attributesBuilder.put(LABEL_TOPIC, result.topic); - attributesBuilder.put(LABEL_IS_RETRY, result.isRetry); - attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic, result.group)); + attributesBuilder.put(LABEL_CONSUMER_GROUP_KEY, result.group); + attributesBuilder.put(LABEL_TOPIC_KEY, result.topic); + attributesBuilder.put(LABEL_IS_RETRY_KEY, result.isRetry); + attributesBuilder.put(LABEL_IS_SYSTEM_KEY, isSystem(result.topic, result.group)); return attributesBuilder.build(); } @@ -571,18 +599,18 @@ private void initStatsMetrics() { .setDescription("Request processor watermark") .ofLongs() .buildWithCallback(measurement -> { - measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "send").build()); - measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "async_put").build()); - measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "pull").build()); - measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "ack").build()); - measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "query_message").build()); - measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "client_manager").build()); - measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "heartbeat").build()); - measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "lite_pull").build()); - measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "transaction").build()); - measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "consumer_manager").build()); - measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "admin").build()); - measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR, "reply").build()); + measurement.record(brokerController.getSendThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "send").build()); + measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "async_put").build()); + measurement.record(brokerController.getPullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "pull").build()); + measurement.record(brokerController.getAckThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "ack").build()); + measurement.record(brokerController.getQueryThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "query_message").build()); + measurement.record(brokerController.getClientManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "client_manager").build()); + measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "heartbeat").build()); + measurement.record(brokerController.getLitePullThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "lite_pull").build()); + measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "transaction").build()); + measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "consumer_manager").build()); + measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "admin").build()); + measurement.record(brokerController.getReplyThreadPoolQueue().size(), newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "reply").build()); }); brokerPermission = brokerMeter.gaugeBuilder(GAUGE_BROKER_PERMISSION) @@ -662,9 +690,9 @@ private void initConnectionMetrics() { }); metricsMap.forEach((attr, count) -> { Attributes attributes = newAttributesBuilder() - .put(LABEL_LANGUAGE, attr.language.name().toLowerCase()) - .put(LABEL_VERSION, MQVersion.getVersionDesc(attr.version).toLowerCase()) - .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING) + .put(LABEL_LANGUAGE_KEY, attr.language.name().toLowerCase()) + .put(LABEL_VERSION_KEY, MQVersion.getVersionDesc(attr.version).toLowerCase()) + .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING) .build(); measurement.record(count, attributes); }); @@ -688,12 +716,12 @@ private void initConnectionMetrics() { }); metricsMap.forEach((attr, count) -> { Attributes attributes = newAttributesBuilder() - .put(LABEL_CONSUMER_GROUP, attr.group) - .put(LABEL_LANGUAGE, attr.language.name().toLowerCase()) - .put(LABEL_VERSION, MQVersion.getVersionDesc(attr.version).toLowerCase()) - .put(LABEL_CONSUME_MODE, attr.consumeMode.getTypeCN().toLowerCase()) - .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING) - .put(LABEL_IS_SYSTEM, isSystemGroup(attr.group)) + .put(LABEL_CONSUMER_GROUP_KEY, attr.group) + .put(LABEL_LANGUAGE_KEY, attr.language.name().toLowerCase()) + .put(LABEL_VERSION_KEY, MQVersion.getVersionDesc(attr.version).toLowerCase()) + .put(LABEL_CONSUME_MODE_KEY, attr.consumeMode.getTypeCN().toLowerCase()) + .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING) + .put(LABEL_IS_SYSTEM_KEY, isSystemGroup(attr.group)) .build(); measurement.record(count, attributes); }); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java index 1fb6e892bf6..36a256f2e91 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java @@ -45,8 +45,8 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP; -import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY; +import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY; import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL; import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL; import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL; @@ -181,8 +181,8 @@ public void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus st public void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType, PutMessageStatus status, int num) { Attributes attributes = this.newAttributesBuilder() - .put(LABEL_CONSUMER_GROUP, group) - .put(LABEL_TOPIC, topic) + .put(LABEL_CONSUMER_GROUP_KEY, group) + .put(LABEL_TOPIC_KEY, topic) .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name()) .put(LABEL_PUT_STATUS, status.name()) .build(); @@ -201,8 +201,8 @@ public void incPopReviveGetCount(String group, String topic, PopReviveMessageTyp int num) { AttributesBuilder builder = this.newAttributesBuilder(); Attributes attributes = builder - .put(LABEL_CONSUMER_GROUP, group) - .put(LABEL_TOPIC, topic) + .put(LABEL_CONSUMER_GROUP_KEY, group) + .put(LABEL_TOPIC_KEY, topic) .put(LABEL_QUEUE_ID, queueId) .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name()) .build(); @@ -212,8 +212,8 @@ public void incPopReviveGetCount(String group, String topic, PopReviveMessageTyp public void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) { AttributesBuilder builder = this.newAttributesBuilder(); Attributes attributes = builder - .put(LABEL_CONSUMER_GROUP, checkPoint.getCId()) - .put(LABEL_TOPIC, checkPoint.getTopic()) + .put(LABEL_CONSUMER_GROUP_KEY, checkPoint.getCId()) + .put(LABEL_TOPIC_KEY, checkPoint.getTopic()) .put(LABEL_PUT_STATUS, status.name()) .build(); this.popReviveRetryMessageTotal.add(1, attributes); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java index f9b3e4c6fa4..db9999b78f4 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.remoting.metrics; +import io.opentelemetry.api.common.AttributeKey; + public class RemotingMetricsConstant { public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency"; public static final String LABEL_PROTOCOL_TYPE = "protocol_type"; @@ -24,6 +26,14 @@ public class RemotingMetricsConstant { public static final String LABEL_IS_LONG_POLLING = "is_long_polling"; public static final String LABEL_RESULT = "result"; + // Pre-built typed AttributeKey singletons. Use these in AttributesBuilder.put() + // on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per call. + public static final AttributeKey LABEL_PROTOCOL_TYPE_KEY = AttributeKey.stringKey(LABEL_PROTOCOL_TYPE); + public static final AttributeKey LABEL_REQUEST_CODE_KEY = AttributeKey.stringKey(LABEL_REQUEST_CODE); + public static final AttributeKey LABEL_RESPONSE_CODE_KEY = AttributeKey.stringKey(LABEL_RESPONSE_CODE); + public static final AttributeKey LABEL_IS_LONG_POLLING_KEY = AttributeKey.booleanKey(LABEL_IS_LONG_POLLING); + public static final AttributeKey LABEL_RESULT_KEY = AttributeKey.stringKey(LABEL_RESULT); + public static final String PROTOCOL_TYPE_REMOTING = "remoting"; public static final String RESULT_ONEWAY = "oneway"; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java index 5da06dcb5ba..bae41d0bf3b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java @@ -30,20 +30,30 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.apache.rocketmq.common.Pair; import org.apache.rocketmq.common.metrics.NopLongHistogram; +import org.apache.rocketmq.remoting.common.RemotingHelper; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY; -import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE_KEY; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT_KEY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED; +import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_ONEWAY; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED; public class RemotingMetricsManager { private LongHistogram rpcLatency = new NopLongHistogram(); private Supplier attributesBuilderSupplier; + private final ConcurrentHashMap attributesCache = new ConcurrentHashMap<>(); + private volatile long lastAttrsCacheKey; + private volatile Attributes lastCachedAttrs; public RemotingMetricsManager() { } @@ -53,7 +63,7 @@ public AttributesBuilder newAttributesBuilder() { return Attributes.builder(); } return this.attributesBuilderSupplier.get() - .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING); + .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING); } public void initMetrics(Meter meter, Supplier attributesBuilderSupplier) { @@ -86,6 +96,44 @@ public List> getMetricsView() { return Lists.newArrayList(new Pair<>(selector, viewBuilder)); } + public Attributes getOrBuildAttributes(int requestCode, int responseCode, + boolean isLongPolling, String result) { + int resultIdx; + if (RESULT_SUCCESS.equals(result)) resultIdx = 0; + else if (RESULT_ONEWAY.equals(result)) resultIdx = 1; + else if (RESULT_WRITE_CHANNEL_FAILED.equals(result)) resultIdx = 2; + else if (RESULT_CANCELED.equals(result)) resultIdx = 3; + else resultIdx = -1; + + if (resultIdx < 0) { + return buildAttributes(requestCode, responseCode, isLongPolling, result); + } + + long key = ((long) requestCode << 19) + | ((long) (responseCode & 0xFFFF) << 3) + | (isLongPolling ? 4L : 0L) + | resultIdx; + Attributes cached = this.lastCachedAttrs; + if (cached != null && key == this.lastAttrsCacheKey) { + return cached; + } + Attributes attrs = attributesCache.computeIfAbsent(key, + k -> buildAttributes(requestCode, responseCode, isLongPolling, result)); + this.lastAttrsCacheKey = key; + this.lastCachedAttrs = attrs; + return attrs; + } + + private Attributes buildAttributes(int requestCode, int responseCode, + boolean isLongPolling, String result) { + return newAttributesBuilder() + .put(LABEL_IS_LONG_POLLING_KEY, isLongPolling) + .put(LABEL_REQUEST_CODE_KEY, RemotingHelper.getRequestCodeDesc(requestCode)) + .put(LABEL_RESPONSE_CODE_KEY, RemotingHelper.getResponseCodeDesc(responseCode)) + .put(LABEL_RESULT_KEY, result) + .build(); + } + public String getWriteAndFlushResult(Future future) { String result = RESULT_SUCCESS; if (future.isCancelled()) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java index c6a97fe441b..45c5c14fc66 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java @@ -30,8 +30,25 @@ @ChannelHandler.Sharable public class RemotingCodeDistributionHandler extends ChannelDuplexHandler { + /** + * Immutable holder for a cached code→adder pair. Published via a single volatile + * write so racing threads always observe a consistent pair (never a mixed code from + * one update with an adder from another). + */ + private static final class CachedCounter { + final int code; + final LongAdder adder; + + CachedCounter(int code, LongAdder adder) { + this.code = code; + this.adder = adder; + } + } + private final ConcurrentMap inboundDistribution; private final ConcurrentMap outboundDistribution; + private volatile CachedCounter lastIn; + private volatile CachedCounter lastOut; public RemotingCodeDistributionHandler() { inboundDistribution = new ConcurrentHashMap<>(); @@ -39,12 +56,24 @@ public RemotingCodeDistributionHandler() { } private void countInbound(int requestCode) { + CachedCounter cached = lastIn; + if (cached != null && cached.code == requestCode) { + cached.adder.increment(); + return; + } LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> new LongAdder()); + lastIn = new CachedCounter(requestCode, item); item.increment(); } private void countOutbound(int responseCode) { + CachedCounter cached = lastOut; + if (cached != null && cached.code == responseCode) { + cached.adder.increment(); + return; + } LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k -> new LongAdder()); + lastOut = new CachedCounter(responseCode, item); item.increment(); }