[ISSUE #10515] Fix TransactionMetricsFlushService busy spin bug#10517
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR reduces idle CPU usage in the transaction metrics flush loop and introduces key-string caching to cut down repeated string construction for consumer offsets and long-poll pull request keys.
Changes:
- Prevents a busy-spin loop in
TransactionMetricsFlushServiceby always blocking between flush checks. - Adds caching of
topic@groupkeys inConsumerOffsetManager. - Adds caching of
topic@queueIdkeys inPullRequestHoldService.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java | Blocks the service thread between flush attempts to avoid CPU spinning. |
| broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java | Introduces a cache for topic@group composite keys used across offset operations. |
| broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java | Introduces a cache for topic@queueId composite keys used in pull request holding. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| long interval = brokerController.getBrokerConfig().getTransactionMetricFlushInterval(); | ||
| this.waitForRunning(interval); | ||
| if (System.currentTimeMillis() - start > interval) { |
There was a problem hiding this comment.
Good point. Changed > to >= to avoid skipping persist when elapsed time equals the interval exactly.
| private String buildTopicGroupKey(String topic, String group) { | ||
| ConcurrentHashMap<String, String> groupMap = topicGroupKeyCache.get(topic); |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer qianye1001 feedback. Only TransactionMetricsFlushService busy spin fix remains.
| protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); | ||
| public static final String TOPIC_GROUP_SEPARATOR = "@"; | ||
|
|
||
| private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> topicGroupKeyCache = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
| String key = topic + TOPIC_GROUP_SEPARATOR + group; | ||
| topicGroupKeyCache.computeIfAbsent(topic, t -> new ConcurrentHashMap<>()).put(group, key); |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
| sb.append(TOPIC_QUEUEID_SEPARATOR); | ||
| sb.append(queueId); | ||
| return sb.toString(); | ||
| String[] keys = buildKeyCache.get(topic); |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
| int len = Math.max(queueId + 1, 16); | ||
| keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]); | ||
| if (queueId >= keys.length) { | ||
| String[] grown = new String[queueId + 16]; |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
| String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId; | ||
| if (topic != null && queueId >= 0) { | ||
| int len = Math.max(queueId + 1, 16); | ||
| keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]); |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
| buildKeyCache.put(topic, grown); | ||
| keys = grown; | ||
| } | ||
| keys[queueId] = key; |
There was a problem hiding this comment.
This file has been removed from the PR per reviewer feedback.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #10517 +/- ##
=============================================
- Coverage 48.13% 48.08% -0.06%
+ Complexity 13379 13363 -16
=============================================
Files 1377 1377
Lines 100730 100731 +1
Branches 13012 13012
=============================================
- Hits 48491 48435 -56
- Misses 46300 46342 +42
- Partials 5939 5954 +15 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR introduces key-string caching in ConsumerOffsetManager and PullRequestHoldService to reduce repeated string concatenation, and fixes a busy-spin bug in TransactionMetricsFlushService. The busy-spin fix is a clear correctness improvement. The caching optimizations have valid intent but introduce thread-safety and NPE regression risks that need to be addressed before merge.
Findings
-
[Critical]
PullRequestHoldService.java:63-83— Thread-safety race inString[]cache. ThebuildKeyCachestores mutableString[]arrays. Two concurrent threads can both seekeys == nullfromcomputeIfAbsent, each create a different array, and one overwrites the other viaput(). Worse, the array growth path (new String[queueId + 16]+System.arraycopy+put) is not atomic — a reader thread can observe a partially-grown array. SincePullRequestHoldServiceis accessed by multiple pull-request handler threads concurrently, this is a real data-race. Recommendation: ReplaceString[]withConcurrentHashMap<Integer, String>per topic, or useAtomicReferenceArraywith a safe publication pattern. -
[Critical]
ConsumerOffsetManager.java:49andPullRequestHoldService.java:63— NPE regression with null keys. BothConcurrentHashMap.get()andConcurrentMap.get()throwNullPointerExceptionon null keys. The original code (topic + "@" + groupandStringBuilder.append(topic)) silently produced strings like"null@group". If any caller passes null topic/group (e.g., during error paths or uninitialized state), this becomes a runtime crash. Recommendation: Add explicit null guards that bypass the cache and fall back to concatenation, or validate at the call boundary. -
[Warning]
PullRequestHoldService.java:75— Integer overflow in array sizing.queueId + 16can overflow for largequeueIdvalues, producing a negative array size. While queue IDs are typically small, a malformed request or misconfiguration could triggerNegativeArraySizeException. Recommendation: Cap the array growth at a reasonable maximum (e.g.,Math.min(queueId + 16, 1024)) and fall back to a map-based cache beyond that. -
[Warning]
ConsumerOffsetManager.java:46— Unbounded cache growth.topicGroupKeyCacheretains every(topic, group)pair forever. On brokers serving many ephemeral topics or short-lived consumer groups, this becomes a slow memory leak. Each entry is ~100 bytes (two String objects + CHM node), so 1M unique pairs ≈ 100MB. Recommendation: Consider a bounded cache (Caffeine with max size) or periodic cleanup aligned with topic/group lifecycle. -
[Info]
TransactionMetricsFlushService.java:47-52— Busy-spin fix is correct and well-targeted. MovingwaitForRunning(interval)before theifcheck ensures the thread always yields between iterations, eliminating the CPU waste described in the issue. The existing>vs>=timing edge case (noted by Copilot) is minor — flush cadence drift of one interval is acceptable for a metrics flush. -
[Info]
ConsumerOffsetManager.java— The 2-levelConcurrentHashMap<String, ConcurrentHashMap<String, String>>design is sound for the common case where topic cardinality is moderate and group cardinality per topic is stable. ThecomputeIfAbsentatomicity guarantee on the outer map is correct. The main concern is the unbounded growth noted above.
Suggestions
- Priority fix: Address the NPE regression — this is a behavioral change that can crash brokers on null inputs.
- Priority fix: Replace
String[]inPullRequestHoldServicewith a thread-safe alternative. - Consider: Adding a cache size cap or using Caffeine for both caches to prevent unbounded growth.
- Nit: The PR title mentions three changes — consider splitting the
TransactionMetricsFlushServicebusy-spin fix into a separate PR since it's a standalone correctness fix that doesn't need to wait for the caching discussion.
Cross-repo Note
No protocol or API changes — this is purely internal broker optimization. No coordination needed with rocketmq-clients.
Automated review by github-manager-bot
|
1 和 2 不建议修,没意义,3 修一下就行了 |
0cec077 to
3b695dc
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
Fixes a busy-spin bug in TransactionMetricsFlushService.run() where waitForRunning() was only called inside the if branch, causing the while loop to spin without yielding when the flush interval had not elapsed.
Findings
- [Positive]
TransactionMetricsFlushService.java— The fix correctly moveswaitForRunning(interval)before theifcheck, ensuring the thread always sleeps between iterations. This eliminates the ~170 CPU samples observed in JFR on idle brokers. - [Info]
TransactionMetricsFlushService.java:46-48— The local variableintervalis now read once per iteration and reused for bothwaitForRunning()and the comparison. Minor improvement over callinggetTransactionMetricFlushInterval()multiple times.
Suggestions
- Consider whether the
starttimestamp should be updated afterwaitForRunning()returns rather than afterpersist(). Currently, ifpersist()takes significant time, the next comparison may immediately evaluate totrue. However, sincewaitForRunning(interval)already sleeps for the full interval, this is unlikely to cause issues in practice.
Verdict
LGTM. Clean, minimal fix for a real busy-spin bug.
Automated review by github-manager-bot
3b695dc to
d59bbe1
Compare
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (re-review)
Summary
Fixes a busy-spin bug in TransactionMetricsFlushService.run() where waitForRunning() was only called inside the if branch, causing the while loop to spin without yielding when the flush interval had not elapsed (~170 CPU samples in JFR on idle broker).
Findings
- [Positive]
TransactionMetricsFlushService.java:44-50— The fix correctly moveswaitForRunning(interval)before theifcheck, ensuring the thread always sleeps between iterations. The interval is now read once into a local variable, avoiding redundant config lookups. - [Positive] The change from
>to>=in the elapsed-time check is a minor improvement — ensures the flush happens at exactly the interval boundary rather than strictly after it. - [Info] No test coverage for this specific service thread. Consider adding a unit test that verifies
persist()is called at the expected interval and that the thread properly yields during wait periods.
Verdict
Clean, focused bug fix. LGTM. ✅
Automated review by github-manager-bot
d59bbe1 to
901e77b
Compare
|
LGTM~ |
Which Issue(s) This PR Fixes
Fixes #10515
Brief Description
Fix a busy-spin bug in
TransactionMetricsFlushService.run(). ThewaitForRunning()call was only inside theifbranch, so when the flush interval had not elapsed thewhileloop spun without yielding — wasting ~170 CPU samples on an idle broker in JFR.Move
waitForRunning(interval)before theifcheck so the thread always sleeps between iterations.How Did You Test This Change?