Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ef8cee4
feat(metadata): add internal RWMutex for MetadataInfo and fix json se…
jieguo-coder Jun 3, 2026
605d0b2
feat(metadata): add lock protection for global registry and instances…
jieguo-coder Jun 3, 2026
abf50a7
feat(metadata): fix concurrency safety in listener callbacks and exte…
jieguo-coder Jun 3, 2026
8bc0aaa
style: format import blocks using dubbo-go imports-formatter
jieguo-coder Jun 3, 2026
ea83f92
style: format metadata.go and report_instance.go to pass CI
jieguo-coder Jun 3, 2026
0048c16
fix(metadata): resolve data races pointed out in code review
jieguo-coder Jun 4, 2026
e65f694
test: fix failing tests and improve unit test coverage
jieguo-coder Jun 4, 2026
a7bad03
test: fix testifylint error by avoiding require in goroutines
jieguo-coder Jun 4, 2026
3cf2c5a
fix(metadata): deep copy ServiceInfo to prevent write-on-read data ra…
jieguo-coder Jun 7, 2026
9bf63a0
chore: trigger CI re-run for codecov gpg issue
jieguo-coder Jun 7, 2026
21cedcc
chore: resolve merge conflicts with develop and fix code formatting
jieguo-coder Jun 7, 2026
1bd679e
refactor(metadata): implement no-lock helper to fix data race and avo…
jieguo-coder Jun 11, 2026
c77d53a
chore: resolve merge conflict with #3369, integrating deterministic f…
jieguo-coder Jun 11, 2026
f8d3248
test: fix compilation error in instance changed listener tests by add…
jieguo-coder Jun 11, 2026
d3f571d
fix(metadata): add RLock around global map lookup in RemoveService an…
jieguo-coder Jun 13, 2026
3bf11e4
chore: resolve merge conflicts with #3370, keeping DeepCopy for concu…
jieguo-coder Jun 13, 2026
11cc932
chore: resolve test file conflict by keeping both listener and cache …
jieguo-coder Jun 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion metadata/info/metadata_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net/url"
"strconv"
"strings"
"sync"
)

import (
Expand Down Expand Up @@ -63,6 +64,7 @@ type MetadataInfo struct {
Services map[string]*ServiceInfo `json:"services,omitempty" hessian:"services"`
exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls
subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls
mu sync.RWMutex `json:"-" hessian:"-"`
}

func NewAppMetadataInfo(app string) *MetadataInfo {
Expand Down Expand Up @@ -95,6 +97,15 @@ func (info *MetadataInfo) JavaClassName() string {

// AddService add provider service info to MetadataInfo
func (info *MetadataInfo) AddService(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

info.addServiceWithoutLock(url)
}

// addServiceWithoutLock adds a service URL without acquiring the lock.
// The caller must hold info.mu.Lock() before calling this method.
func (info *MetadataInfo) addServiceWithoutLock(url *common.URL) {
service := NewServiceInfoWithURL(url)
info.Services[service.GetMatchKey()] = service
addUrl(info.exportedServiceURLs, url)
Expand Down Expand Up @@ -131,6 +142,9 @@ func deleteItem(slice []*common.URL, index int) []*common.URL {
}

func (info *MetadataInfo) RemoveService(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

service := NewServiceInfoWithURL(url)
removeUrl(info.exportedServiceURLs, url)
if replacement := info.findExportedServiceURL(service.GetMatchKey()); replacement != nil {
Expand All @@ -142,15 +156,24 @@ func (info *MetadataInfo) RemoveService(url *common.URL) {

// AddSubscribeURL client subscribe a service url
func (info *MetadataInfo) AddSubscribeURL(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

addUrl(info.subscribedServiceURLs, url)
}

// RemoveSubscribeURL client unsubscribe a service url
func (info *MetadataInfo) RemoveSubscribeURL(url *common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

removeUrl(info.subscribedServiceURLs, url)
}

func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL {
info.mu.RLock()
defer info.mu.RUnlock()

res := make([]*common.URL, 0)
for _, urls := range info.exportedServiceURLs {
res = append(res, urls...)
Expand All @@ -159,18 +182,37 @@ func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL {
}

func (info *MetadataInfo) GetSubscribedURLs() []*common.URL {
info.mu.RLock()
defer info.mu.RUnlock()

res := make([]*common.URL, 0)
for _, urls := range info.subscribedServiceURLs {
res = append(res, urls...)
}
return res
}

// GetServices returns a deep copy of the Services map for safe iteration by external callers.
// Each ServiceInfo is fully copied with lazy fields eagerly populated to prevent write-on-read races.
func (info *MetadataInfo) GetServices() map[string]*ServiceInfo {
info.mu.Lock()
defer info.mu.Unlock()

cp := make(map[string]*ServiceInfo, len(info.Services))
for k, v := range info.Services {
cp[k] = v.DeepCopy()
}
return cp
}

func (info *MetadataInfo) ReplaceExportedServices(urls []*common.URL) {
info.mu.Lock()
defer info.mu.Unlock()

info.Services = make(map[string]*ServiceInfo)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 这里仍然绕过 MetadataInfo.mu 直接重置 ServicesexportedServiceURLs。本 PR 已经让 AddServiceRemoveServiceGetServicesGetExportedServiceURLs 通过同一个 mutex 保护这些字段,但 ReplaceExportedServicesservice_discovery_registry.go 调用时会无锁写 map;如果同时有 metadata 读取或实例变更处理在调用 GetServices/GetExportedServiceURLs,仍然可能触发 data race 或读到半重建状态。这里需要在函数入口持有写锁,并避免在持锁后直接调用会再次加锁的 AddService,可以拆出一个内部 no-lock helper 来复用写入逻辑。

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的,我现在使用了内部无锁辅助模式重构逻辑,并提取 addServiceWithoutLock 来处理核心映射更新。现在,ReplaceExportedServices 在入口获取 info.mu.Lock() 一次,并在循环中安全地重用辅助函数。

info.exportedServiceURLs = make(map[string][]*common.URL)
for _, serviceURL := range urls {
info.AddService(serviceURL)
info.addServiceWithoutLock(serviceURL)
}
}

Expand Down Expand Up @@ -283,3 +325,23 @@ func (si *ServiceInfo) GetServiceKey() string {
si.ServiceKey = common.ServiceKey(si.Name, si.Group, si.Version)
return si.ServiceKey
}

// DeepCopy returns a fully independent copy of ServiceInfo with lazy fields eagerly populated.
func (si *ServiceInfo) DeepCopy() *ServiceInfo {
params := make(map[string]string, len(si.Params))
for k, v := range si.Params {
params[k] = v
}
return &ServiceInfo{
Name: si.Name,
Group: si.Group,
Version: si.Version,
Protocol: si.Protocol,
Port: si.Port,
Path: si.Path,
Params: params,
ServiceKey: si.GetServiceKey(),
MatchKey: si.GetMatchKey(),
URL: si.URL,
}
}
21 changes: 21 additions & 0 deletions metadata/info/metadata_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,27 @@ func TestServiceInfoGetMatchKey(t *testing.T) {
assert.NotEmpty(t, si.GetMatchKey())
}

func TestMetadataInfoGetServices(t *testing.T) {
metadataInfo := &MetadataInfo{
Services: make(map[string]*ServiceInfo),
exportedServiceURLs: make(map[string][]*common.URL),
subscribedServiceURLs: make(map[string][]*common.URL),
}
url, _ := common.NewURL("dubbo://127.0.0.1:20000?application=foo&category=providers&check=false&dubbo=dubbo-go+v1.5.0&interface=com.foo.Bar&methods=GetPetByID%2CGetPetTypes&organization=Apache&owner=foo&revision=1.0.0&side=provider&version=1.0.0")
metadataInfo.AddService(url)

services := metadataInfo.GetServices()
require.Len(t, services, 1)
assert.NotEmpty(t, services)

// GetServices returns a copy: modifying the original does not affect the snapshot
metadataInfo.RemoveService(url)
assert.Len(t, services, 1)

// A fresh call reflects the removal
assert.Empty(t, metadataInfo.GetServices())
}

func TestServiceInfoJavaClassName(t *testing.T) {
assert.Equalf(t, "org.apache.dubbo.metadata.MetadataInfo", NewAppMetadataInfo("dubbo").JavaClassName(), "JavaClassName()")
}
21 changes: 18 additions & 3 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
// Package metadata collects and exposes information of all services for service discovery purpose.
package metadata

import (
"sync"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
)

var (
registryMetadataInfo = make(map[string]*info.MetadataInfo)
registryMetadataInfo = make(map[string]*info.MetadataInfo)
registryMetadataLock sync.RWMutex
metadataService MetadataService = &DefaultMetadataService{metadataMap: registryMetadataInfo}
)

Expand All @@ -34,27 +39,37 @@ func GetMetadataService() MetadataService {
}

func GetMetadataInfo(registryId string) *info.MetadataInfo {
registryMetadataLock.RLock()
defer registryMetadataLock.RUnlock()
return registryMetadataInfo[registryId]
}

func AddService(registryId string, url *common.URL) {
registryMetadataLock.Lock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] 这里仍然裸读 registryMetadataInfo。这个 PR 让 AddService/AddSubscribeURL/GetMetadataInfo 通过 registryMetadataLock 保护同一个全局 map,但 RemoveService/RemoveSubscribeURL 还在没有 RLock 的情况下做 map lookup;当服务导出/订阅与注销并发发生时,Add 分支可能正在写入 registryMetadataInfo,Remove 分支同时读会继续触发 concurrent map read/write。这里需要先在 registryMetadataLock 下取出 metadataInfo,再释放全局锁后调用 metadataInfo.RemoveService,和 AddService 的锁粒度保持一致。

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我已经更新了 metadata/metadata.go 中的 RemoveServiceRemoveSubscribeURL。现在,它们会获取 registryMetadataLock.RLock(),执行映射查找,并在调用实例自身的方法之前立即释放 RUnlock()

全局锁粒度最小化,并与 AddService 完全对称,在不引入任何死锁的情况下解决并发读写风险

if _, exist := registryMetadataInfo[registryId]; !exist {
registryMetadataInfo[registryId] = info.NewMetadataInfo(
url.GetParam(constant.ApplicationKey, ""),
url.GetParam(constant.ApplicationTagKey, ""),
)
}
registryMetadataInfo[registryId].AddService(url)
metaInfo := registryMetadataInfo[registryId]
registryMetadataLock.Unlock()

metaInfo.AddService(url)
}

func AddSubscribeURL(registryId string, url *common.URL) {
registryMetadataLock.Lock()
if _, exist := registryMetadataInfo[registryId]; !exist {
registryMetadataInfo[registryId] = info.NewMetadataInfo(
url.GetParam(constant.ApplicationKey, ""),
url.GetParam(constant.ApplicationTagKey, ""),
)
}
registryMetadataInfo[registryId].AddSubscribeURL(url)
metaInfo := registryMetadataInfo[registryId]
registryMetadataLock.Unlock()

metaInfo.AddSubscribeURL(url)
}

func RemoveService(registryId string, url *common.URL) {
Expand Down
8 changes: 7 additions & 1 deletion metadata/metadata_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func (mts *DefaultMetadataService) GetMetadataInfo(revision string) (*info.Metad
if revision == "" {
return nil, nil
}
registryMetadataLock.RLock()
defer registryMetadataLock.RUnlock()
for _, metadataInfo := range mts.metadataMap {
if metadataInfo.Revision == revision {
return metadataInfo, nil
Expand All @@ -106,6 +108,8 @@ func (mts *DefaultMetadataService) GetMetadataInfo(revision string) (*info.Metad

// GetExportedServiceURLs get exported service urls
func (mts *DefaultMetadataService) GetExportedServiceURLs() ([]*common.URL, error) {
registryMetadataLock.RLock()
defer registryMetadataLock.RUnlock()
urls := make([]*common.URL, 0)
for _, metadataInfo := range mts.metadataMap {
urls = append(urls, metadataInfo.GetExportedServiceURLs()...)
Expand All @@ -124,6 +128,8 @@ func (mts *DefaultMetadataService) GetMetadataServiceURL() (*common.URL, error)
}

func (mts *DefaultMetadataService) GetSubscribedURLs() ([]*common.URL, error) {
registryMetadataLock.RLock()
defer registryMetadataLock.RUnlock()
urls := make([]*common.URL, 0)
for _, metadataInfo := range mts.metadataMap {
urls = append(urls, metadataInfo.GetSubscribedURLs()...)
Expand Down Expand Up @@ -257,7 +263,7 @@ func (mtsV2 *MetadataServiceV2) GetMetadataInfo(ctx context.Context, req *triple
return &tripleapi.MetadataInfoV2{
App: metadataInfo.App,
Version: metadataInfo.Revision,
Services: convertV2(metadataInfo.Services),
Services: convertV2(metadataInfo.GetServices()),
}, err
}

Expand Down
33 changes: 33 additions & 0 deletions metadata/metadata_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metadata

import (
"strconv"
"sync"
"testing"
)

Expand Down Expand Up @@ -362,3 +363,35 @@ func Test_serviceExporterExport(t *testing.T) {
require.NoError(t, err)
})
}

func TestDefaultMetadataServiceConcurrentReadAccess(t *testing.T) {
mts := &DefaultMetadataService{
metadataMap: newMetadataMap(),
}

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
urls, err := mts.GetExportedServiceURLs()
assert.NoError(t, err)
assert.NotEmpty(t, urls)
}()
wg.Add(1)
go func() {
defer wg.Done()
urls, err := mts.GetSubscribedURLs()
assert.NoError(t, err)
assert.NotEmpty(t, urls)
}()
wg.Add(1)
go func() {
defer wg.Done()
info, err := mts.GetMetadataInfo("1")
assert.NoError(t, err)
assert.NotNil(t, info)
}()
}
wg.Wait()
}
34 changes: 34 additions & 0 deletions metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package metadata

import (
"sync"
"testing"
)

Expand Down Expand Up @@ -179,3 +180,36 @@ func TestGetMetadataService(t *testing.T) {
})
}
}

func TestAddServiceConcurrent(t *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
registryId := "concurrent-reg"
wg.Add(1)
go func(idx int) {
defer wg.Done()
url := common.NewURLWithOptions(
common.WithProtocol("dubbo"),
common.WithParamsValue(constant.ApplicationKey, "dubbo"),
common.WithParamsValue(constant.ApplicationTagKey, "v1"),
)
AddService(registryId, url)
}(i)
wg.Add(1)
go func(idx int) {
defer wg.Done()
url := common.NewURLWithOptions(
common.WithProtocol("dubbo"),
common.WithParamsValue(constant.ApplicationKey, "dubbo"),
common.WithParamsValue(constant.ApplicationTagKey, "v1"),
)
AddSubscribeURL(registryId, url)
}(i)
wg.Add(1)
go func(idx int) {
defer wg.Done()
_ = GetMetadataInfo(registryId)
}(i)
}
wg.Wait()
}
4 changes: 2 additions & 2 deletions metadata/report/nacos/report_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func newNacosMetadataReport(f fields) *nacosMetadataReport {
}

func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) {
mi := info.MetadataInfo{
mi := &info.MetadataInfo{
App: "GetAppMetadata",
}
data, _ := json.Marshal(mi)
Expand All @@ -198,7 +198,7 @@ func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) {
application: "dubbo",
revision: "revision",
},
want: &mi,
want: mi,
wantErr: false,
},
}
Expand Down
Loading
Loading