Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ef8cee4
feat(metadata): add internal RWMutex for MetadataInfo and fix json se…
jieguo-coder Jun 3, 2026
605d0b2
feat(metadata): add lock protection for global registry and instances…
jieguo-coder Jun 3, 2026
abf50a7
feat(metadata): fix concurrency safety in listener callbacks and exte…
jieguo-coder Jun 3, 2026
8bc0aaa
style: format import blocks using dubbo-go imports-formatter
jieguo-coder Jun 3, 2026
ea83f92
style: format metadata.go and report_instance.go to pass CI
jieguo-coder Jun 3, 2026
0048c16
fix(metadata): resolve data races pointed out in code review
jieguo-coder Jun 4, 2026
e65f694
test: fix failing tests and improve unit test coverage
jieguo-coder Jun 4, 2026
a7bad03
test: fix testifylint error by avoiding require in goroutines
jieguo-coder Jun 4, 2026
3cf2c5a
fix(metadata): deep copy ServiceInfo to prevent write-on-read data ra…
jieguo-coder Jun 7, 2026
9bf63a0
chore: trigger CI re-run for codecov gpg issue
jieguo-coder Jun 7, 2026
21cedcc
chore: resolve merge conflicts with develop and fix code formatting
jieguo-coder Jun 7, 2026
1bd679e
refactor(metadata): implement no-lock helper to fix data race and avo…
jieguo-coder Jun 11, 2026
c77d53a
chore: resolve merge conflict with #3369, integrating deterministic f…
jieguo-coder Jun 11, 2026
f8d3248
test: fix compilation error in instance changed listener tests by add…
jieguo-coder Jun 11, 2026
d3f571d
fix(metadata): add RLock around global map lookup in RemoveService an…
jieguo-coder Jun 13, 2026
3bf11e4
chore: resolve merge conflicts with #3370, keeping DeepCopy for concu…
jieguo-coder Jun 13, 2026
11cc932
chore: resolve test file conflict by keeping both listener and cache …
jieguo-coder Jun 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions metadata/info/metadata_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
)

import (
Expand Down Expand Up @@ -63,6 +64,7 @@ type MetadataInfo struct {
Services map[string]*ServiceInfo `json:"services,omitempty" hessian:"services"`
exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls
subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls
mu sync.RWMutex `json:"-" hessian:"-"`
}

func NewAppMetadataInfo(app string) *MetadataInfo {
Expand Down Expand Up @@ -95,6 +97,9 @@ func (info *MetadataInfo) JavaClassName() string {

// AddService add provider service info to MetadataInfo
func (info *MetadataInfo) AddService(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

service := NewServiceInfoWithURL(url)
info.Services[service.GetMatchKey()] = service
addUrl(info.exportedServiceURLs, url)
Expand Down Expand Up @@ -131,22 +136,34 @@ func deleteItem(slice []*common.URL, index int) []*common.URL {
}

func (info *MetadataInfo) RemoveService(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

service := NewServiceInfoWithURL(url)
delete(info.Services, service.GetMatchKey())
removeUrl(info.exportedServiceURLs, url)
}

// AddSubscribeURL client subscribe a service url
func (info *MetadataInfo) AddSubscribeURL(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

addUrl(info.subscribedServiceURLs, url)
}

// RemoveSubscribeURL client unsubscribe a service url
func (info *MetadataInfo) RemoveSubscribeURL(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

removeUrl(info.subscribedServiceURLs, url)
}

func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL {
info.mu.RLock()
defer info.mu.RUnlock()

res := make([]*common.URL, 0)
for _, urls := range info.exportedServiceURLs {
res = append(res, urls...)
Expand All @@ -155,13 +172,28 @@ func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL {
}

func (info *MetadataInfo) GetSubscribedURLs() []*common.URL {
info.mu.RLock()
defer info.mu.RUnlock()

res := make([]*common.URL, 0)
for _, urls := range info.subscribedServiceURLs {
res = append(res, urls...)
}
return res
}

// GetServices returns a copy of the Services map for safe iteration by external callers.
func (info *MetadataInfo) GetServices() map[string]*ServiceInfo {
info.mu.RLock()
defer info.mu.RUnlock()

cp := make(map[string]*ServiceInfo, len(info.Services))
for k, v := range info.Services {
cp[k] = v
}
return cp
}

// ServiceInfo the information of service
type ServiceInfo struct {
Name string `json:"name,omitempty" hessian:"name"`
Expand Down
15 changes: 14 additions & 1 deletion metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
// Package metadata collects and exposes information of all services for service discovery purpose.
package metadata

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
)

var (
registryMetadataInfo = make(map[string]*info.MetadataInfo)
registryMetadataInfo = make(map[string]*info.MetadataInfo)
registryMetadataLock sync.RWMutex
metadataService MetadataService = &DefaultMetadataService{metadataMap: registryMetadataInfo}
)

Expand All @@ -34,25 +39,33 @@ func GetMetadataService() MetadataService {
}

func GetMetadataInfo(registryId string) *info.MetadataInfo {
registryMetadataLock.RLock()
defer registryMetadataLock.RUnlock()
return registryMetadataInfo[registryId]
}

func AddService(registryId string, url *common.URL) {
registryMetadataLock.Lock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 这里仍然裸读 registryMetadataInfo。这个 PR 让 AddService/AddSubscribeURL/GetMetadataInfo 通过 registryMetadataLock 保护同一个全局 map,但 RemoveService/RemoveSubscribeURL 还在没有 RLock 的情况下做 map lookup;当服务导出/订阅与注销并发发生时,Add 分支可能正在写入 registryMetadataInfo,Remove 分支同时读会继续触发 concurrent map read/write。这里需要先在 registryMetadataLock 下取出 metadataInfo,再释放全局锁后调用 metadataInfo.RemoveService,和 AddService 的锁粒度保持一致。

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我已经更新了 metadata/metadata.go 中的 RemoveServiceRemoveSubscribeURL。现在,它们会获取 registryMetadataLock.RLock(),执行映射查找,并在调用实例自身的方法之前立即释放 RUnlock()

全局锁粒度最小化,并与 AddService 完全对称,在不引入任何死锁的情况下解决并发读写风险

if _, exist := registryMetadataInfo[registryId]; !exist {
registryMetadataInfo[registryId] = info.NewMetadataInfo(
url.GetParam(constant.ApplicationKey, ""),
url.GetParam(constant.ApplicationTagKey, ""),
)
}
registryMetadataLock.Unlock()

registryMetadataInfo[registryId].AddService(url)
}

func AddSubscribeURL(registryId string, url *common.URL) {
registryMetadataLock.Lock()
if _, exist := registryMetadataInfo[registryId]; !exist {
registryMetadataInfo[registryId] = info.NewMetadataInfo(
url.GetParam(constant.ApplicationKey, ""),
url.GetParam(constant.ApplicationTagKey, ""),
)
}
registryMetadataLock.Unlock()

registryMetadataInfo[registryId].AddSubscribeURL(url)
}
Comment thread
Alanxtl marked this conversation as resolved.
2 changes: 1 addition & 1 deletion metadata/metadata_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (mtsV2 *MetadataServiceV2) GetMetadataInfo(ctx context.Context, req *triple
return &tripleapi.MetadataInfoV2{
App: metadataInfo.App,
Version: metadataInfo.Revision,
Services: convertV2(metadataInfo.Services),
Services: convertV2(metadataInfo.GetServices()),
}, err
}

Expand Down
4 changes: 2 additions & 2 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newNacosMetadataReport(f fields) *nacosMetadataReport {
}

func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) {
mi := info.MetadataInfo{
mi := &info.MetadataInfo{
App: "GetAppMetadata",
}
data, _ := json.Marshal(mi)
Expand All @@ -198,7 +198,7 @@ func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) {
application: "dubbo",
revision: "revision",
},
want: &mi,
want: mi,
wantErr: false,
},
}
Expand Down
18 changes: 16 additions & 2 deletions metadata/report_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metadata

import (
"sync"
"time"
)

Expand All @@ -38,7 +39,8 @@ import (
)

var (
instances = make(map[string]report.MetadataReport)
instances = make(map[string]report.MetadataReport)
instancesMu sync.RWMutex
)

func addMetadataReport(registryId string, url *common.URL) error {
Expand All @@ -47,11 +49,19 @@ func addMetadataReport(registryId string, url *common.URL) error {
logger.Warnf("no metadata report factory of protocol %s found, please check if the metadata report factory is imported", url.Protocol)
return nil
}
instancesMu.Lock()
instances[registryId] = &DelegateMetadataReport{instance: fac.CreateMetadataReport(url)}
instancesMu.Unlock()
Comment on lines +60 to +62
return nil
}

func GetMetadataReport() report.MetadataReport {
instancesMu.RLock()
defer instancesMu.RUnlock()
return getMetadataReportUnsafe()
}

func getMetadataReportUnsafe() report.MetadataReport {
for _, v := range instances {
return v
}
Expand All @@ -62,13 +72,17 @@ func GetMetadataReportByRegistry(registry string) report.MetadataReport {
if len(registry) == 0 {
registry = constant.DefaultKey
}
instancesMu.RLock()
defer instancesMu.RUnlock()
if r, ok := instances[registry]; ok {
return r
}
return GetMetadataReport()
return getMetadataReportUnsafe()
}

func GetMetadataReports() []report.MetadataReport {
instancesMu.RLock()
defer instancesMu.RUnlock()
reports := make([]report.MetadataReport, len(instances))
index := 0
for _, r := range instances {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error
continue
}
instance.SetServiceMetadata(metadataInfo)
for _, service := range metadataInfo.Services {
for _, service := range metadataInfo.GetServices() {
matchKey := service.GetMatchKey()
if serviceToRevisionServices[matchKey] == nil {
serviceToRevisionServices[matchKey] = make(map[string]*info.ServiceInfo)
Expand Down Expand Up @@ -203,8 +203,11 @@ func toInstanceServiceURLs(instance registry.ServiceInstance, serviceInfo *info.

// AddListenerAndNotify add notify listener and notify to listen service event
func (lstn *ServiceInstancesChangedListenerImpl) AddListenerAndNotify(serviceKey string, notify registry.NotifyListener) {
lstn.mutex.Lock()
lstn.listeners[serviceKey] = notify
urls := lstn.serviceUrls[serviceKey]
lstn.mutex.Unlock()

for _, url := range urls {
notify.Notify(&registry.ServiceEvent{
Action: remoting.EventTypeAdd,
Expand All @@ -215,7 +218,9 @@ func (lstn *ServiceInstancesChangedListenerImpl) AddListenerAndNotify(serviceKey

// RemoveListener remove notify listener
func (lstn *ServiceInstancesChangedListenerImpl) RemoveListener(serviceKey string) {
lstn.mutex.Lock()
delete(lstn.listeners, serviceKey)
lstn.mutex.Unlock()
Comment thread
Alanxtl marked this conversation as resolved.
Outdated
}

// GetServiceNames return all listener service names
Expand Down
Loading