diff --git a/metadata/info/metadata_info.go b/metadata/info/metadata_info.go index 1714389998..8843fb0292 100644 --- a/metadata/info/metadata_info.go +++ b/metadata/info/metadata_info.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "sync" ) import ( @@ -65,6 +66,7 @@ type MetadataInfo struct { Services map[string]*ServiceInfo `json:"services,omitempty" hessian:"services"` exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls + mu sync.RWMutex `json:"-" hessian:"-"` } func NewAppMetadataInfo(app string) *MetadataInfo { @@ -97,6 +99,15 @@ func (info *MetadataInfo) JavaClassName() string { // AddService add provider service info to MetadataInfo func (info *MetadataInfo) AddService(url *common.URL) { + info.mu.Lock() + defer info.mu.Unlock() + + info.addServiceWithoutLock(url) +} + +// addServiceWithoutLock adds a service URL without acquiring the lock. +// The caller must hold info.mu.Lock() before calling this method. +func (info *MetadataInfo) addServiceWithoutLock(url *common.URL) { service := NewServiceInfoWithURL(url) info.Services[service.GetMatchKey()] = service addUrl(info.exportedServiceURLs, url) @@ -136,6 +147,9 @@ func deleteItem(slice []*common.URL, index int) []*common.URL { } func (info *MetadataInfo) RemoveService(url *common.URL) { + info.mu.Lock() + defer info.mu.Unlock() + service := NewServiceInfoWithURL(url) removeUrl(info.exportedServiceURLs, url) if replacement := info.findExportedServiceURL(service.GetMatchKey()); replacement != nil { @@ -147,15 +161,24 @@ func (info *MetadataInfo) RemoveService(url *common.URL) { // AddSubscribeURL client subscribe a service url func (info *MetadataInfo) AddSubscribeURL(url *common.URL) { + info.mu.Lock() + defer info.mu.Unlock() + addUrl(info.subscribedServiceURLs, url) } // RemoveSubscribeURL client unsubscribe a service url func (info *MetadataInfo) RemoveSubscribeURL(url *common.URL) { + info.mu.Lock() + defer info.mu.Unlock() + removeUrl(info.subscribedServiceURLs, url) } func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL { + info.mu.RLock() + defer info.mu.RUnlock() + res := make([]*common.URL, 0) for _, urls := range info.exportedServiceURLs { res = append(res, urls...) @@ -164,6 +187,9 @@ func (info *MetadataInfo) GetExportedServiceURLs() []*common.URL { } func (info *MetadataInfo) GetSubscribedURLs() []*common.URL { + info.mu.RLock() + defer info.mu.RUnlock() + res := make([]*common.URL, 0) for _, urls := range info.subscribedServiceURLs { res = append(res, urls...) @@ -171,11 +197,27 @@ func (info *MetadataInfo) GetSubscribedURLs() []*common.URL { return res } +// GetServices returns a deep copy of the Services map for safe iteration by external callers. +// Each ServiceInfo is fully copied with lazy fields eagerly populated to prevent write-on-read races. +func (info *MetadataInfo) GetServices() map[string]*ServiceInfo { + info.mu.Lock() + defer info.mu.Unlock() + + cp := make(map[string]*ServiceInfo, len(info.Services)) + for k, v := range info.Services { + cp[k] = v.DeepCopy() + } + return cp +} + func (info *MetadataInfo) ReplaceExportedServices(urls []*common.URL) { + info.mu.Lock() + defer info.mu.Unlock() + info.Services = make(map[string]*ServiceInfo) info.exportedServiceURLs = make(map[string][]*common.URL) for _, serviceURL := range urls { - info.AddService(serviceURL) + info.addServiceWithoutLock(serviceURL) } } @@ -289,6 +331,26 @@ func (si *ServiceInfo) GetServiceKey() string { return si.ServiceKey } +// DeepCopy returns a fully independent copy of ServiceInfo with lazy fields eagerly populated. +func (si *ServiceInfo) DeepCopy() *ServiceInfo { + params := make(map[string]string, len(si.Params)) + for k, v := range si.Params { + params[k] = v + } + return &ServiceInfo{ + Name: si.Name, + Group: si.Group, + Version: si.Version, + Protocol: si.Protocol, + Port: si.Port, + Path: si.Path, + Params: params, + ServiceKey: si.GetServiceKey(), + MatchKey: si.GetMatchKey(), + URL: si.URL, + } +} + // toDescString returns a deterministic string representation of ServiceInfo // for revision calculation. Aligned with Java dubbo ServiceInfo.toDescString(). // diff --git a/metadata/info/metadata_info_test.go b/metadata/info/metadata_info_test.go index 242f28a392..59c99812ce 100644 --- a/metadata/info/metadata_info_test.go +++ b/metadata/info/metadata_info_test.go @@ -205,6 +205,27 @@ func TestServiceInfoGetMatchKey(t *testing.T) { assert.NotEmpty(t, si.GetMatchKey()) } +func TestMetadataInfoGetServices(t *testing.T) { + metadataInfo := &MetadataInfo{ + Services: make(map[string]*ServiceInfo), + exportedServiceURLs: make(map[string][]*common.URL), + subscribedServiceURLs: make(map[string][]*common.URL), + } + url, _ := common.NewURL("dubbo://127.0.0.1:20000?application=foo&category=providers&check=false&dubbo=dubbo-go+v1.5.0&interface=com.foo.Bar&methods=GetPetByID%2CGetPetTypes&organization=Apache&owner=foo&revision=1.0.0&side=provider&version=1.0.0") + metadataInfo.AddService(url) + + services := metadataInfo.GetServices() + require.Len(t, services, 1) + assert.NotEmpty(t, services) + + // GetServices returns a copy: modifying the original does not affect the snapshot + metadataInfo.RemoveService(url) + assert.Len(t, services, 1) + + // A fresh call reflects the removal + assert.Empty(t, metadataInfo.GetServices()) +} + func TestServiceInfoJavaClassName(t *testing.T) { assert.Equalf(t, "org.apache.dubbo.metadata.MetadataInfo", NewAppMetadataInfo("dubbo").JavaClassName(), "JavaClassName()") } diff --git a/metadata/metadata.go b/metadata/metadata.go index 15367cf1cf..6117864aa0 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -18,6 +18,10 @@ // Package metadata collects and exposes information of all services for service discovery purpose. package metadata +import ( + "sync" +) + import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" @@ -25,7 +29,8 @@ import ( ) var ( - registryMetadataInfo = make(map[string]*info.MetadataInfo) + registryMetadataInfo = make(map[string]*info.MetadataInfo) + registryMetadataLock sync.RWMutex metadataService MetadataService = &DefaultMetadataService{metadataMap: registryMetadataInfo} ) @@ -34,31 +39,44 @@ func GetMetadataService() MetadataService { } func GetMetadataInfo(registryId string) *info.MetadataInfo { + registryMetadataLock.RLock() + defer registryMetadataLock.RUnlock() return registryMetadataInfo[registryId] } func AddService(registryId string, url *common.URL) { + registryMetadataLock.Lock() if _, exist := registryMetadataInfo[registryId]; !exist { registryMetadataInfo[registryId] = info.NewMetadataInfo( url.GetParam(constant.ApplicationKey, ""), url.GetParam(constant.ApplicationTagKey, ""), ) } - registryMetadataInfo[registryId].AddService(url) + metaInfo := registryMetadataInfo[registryId] + registryMetadataLock.Unlock() + + metaInfo.AddService(url) } func AddSubscribeURL(registryId string, url *common.URL) { + registryMetadataLock.Lock() if _, exist := registryMetadataInfo[registryId]; !exist { registryMetadataInfo[registryId] = info.NewMetadataInfo( url.GetParam(constant.ApplicationKey, ""), url.GetParam(constant.ApplicationTagKey, ""), ) } - registryMetadataInfo[registryId].AddSubscribeURL(url) + metaInfo := registryMetadataInfo[registryId] + registryMetadataLock.Unlock() + + metaInfo.AddSubscribeURL(url) } func RemoveService(registryId string, url *common.URL) { + registryMetadataLock.RLock() metadataInfo, exist := registryMetadataInfo[registryId] + registryMetadataLock.RUnlock() + if !exist { return } @@ -66,7 +84,10 @@ func RemoveService(registryId string, url *common.URL) { } func RemoveSubscribeURL(registryId string, url *common.URL) { + registryMetadataLock.RLock() metadataInfo, exist := registryMetadataInfo[registryId] + registryMetadataLock.RUnlock() + if !exist { return } diff --git a/metadata/metadata_service.go b/metadata/metadata_service.go index 8cf7da13b7..353044c04b 100644 --- a/metadata/metadata_service.go +++ b/metadata/metadata_service.go @@ -95,6 +95,8 @@ func (mts *DefaultMetadataService) GetMetadataInfo(revision string) (*info.Metad if revision == "" { return nil, nil } + registryMetadataLock.RLock() + defer registryMetadataLock.RUnlock() for _, metadataInfo := range mts.metadataMap { if metadataInfo.Revision == revision { return metadataInfo, nil @@ -106,6 +108,8 @@ func (mts *DefaultMetadataService) GetMetadataInfo(revision string) (*info.Metad // GetExportedServiceURLs get exported service urls func (mts *DefaultMetadataService) GetExportedServiceURLs() ([]*common.URL, error) { + registryMetadataLock.RLock() + defer registryMetadataLock.RUnlock() urls := make([]*common.URL, 0) for _, metadataInfo := range mts.metadataMap { urls = append(urls, metadataInfo.GetExportedServiceURLs()...) @@ -124,6 +128,8 @@ func (mts *DefaultMetadataService) GetMetadataServiceURL() (*common.URL, error) } func (mts *DefaultMetadataService) GetSubscribedURLs() ([]*common.URL, error) { + registryMetadataLock.RLock() + defer registryMetadataLock.RUnlock() urls := make([]*common.URL, 0) for _, metadataInfo := range mts.metadataMap { urls = append(urls, metadataInfo.GetSubscribedURLs()...) @@ -257,7 +263,7 @@ func (mtsV2 *MetadataServiceV2) GetMetadataInfo(ctx context.Context, req *triple return &tripleapi.MetadataInfoV2{ App: metadataInfo.App, Version: metadataInfo.Revision, - Services: convertV2(metadataInfo.Services), + Services: convertV2(metadataInfo.GetServices()), Tag: metadataInfo.Tag, }, err } diff --git a/metadata/metadata_service_test.go b/metadata/metadata_service_test.go index 67a0bb5571..c0cf0d8a70 100644 --- a/metadata/metadata_service_test.go +++ b/metadata/metadata_service_test.go @@ -20,6 +20,7 @@ package metadata import ( "context" "strconv" + "sync" "testing" ) @@ -391,3 +392,35 @@ func Test_serviceExporterExport(t *testing.T) { require.NoError(t, err) }) } + +func TestDefaultMetadataServiceConcurrentReadAccess(t *testing.T) { + mts := &DefaultMetadataService{ + metadataMap: newMetadataMap(), + } + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + urls, err := mts.GetExportedServiceURLs() + assert.NoError(t, err) + assert.NotEmpty(t, urls) + }() + wg.Add(1) + go func() { + defer wg.Done() + urls, err := mts.GetSubscribedURLs() + assert.NoError(t, err) + assert.NotEmpty(t, urls) + }() + wg.Add(1) + go func() { + defer wg.Done() + info, err := mts.GetMetadataInfo("1") + assert.NoError(t, err) + assert.NotNil(t, info) + }() + } + wg.Wait() +} diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 2c8de5b279..da4446e29f 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -18,6 +18,7 @@ package metadata import ( + "sync" "testing" ) @@ -179,3 +180,36 @@ func TestGetMetadataService(t *testing.T) { }) } } + +func TestAddServiceConcurrent(t *testing.T) { + var wg sync.WaitGroup + for i := 0; i < 20; i++ { + registryId := "concurrent-reg" + wg.Add(1) + go func(idx int) { + defer wg.Done() + url := common.NewURLWithOptions( + common.WithProtocol("dubbo"), + common.WithParamsValue(constant.ApplicationKey, "dubbo"), + common.WithParamsValue(constant.ApplicationTagKey, "v1"), + ) + AddService(registryId, url) + }(i) + wg.Add(1) + go func(idx int) { + defer wg.Done() + url := common.NewURLWithOptions( + common.WithProtocol("dubbo"), + common.WithParamsValue(constant.ApplicationKey, "dubbo"), + common.WithParamsValue(constant.ApplicationTagKey, "v1"), + ) + AddSubscribeURL(registryId, url) + }(i) + wg.Add(1) + go func(idx int) { + defer wg.Done() + _ = GetMetadataInfo(registryId) + }(i) + } + wg.Wait() +} diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go index 108e330bb5..1ce9687728 100644 --- a/metadata/report/nacos/report_test.go +++ b/metadata/report/nacos/report_test.go @@ -171,7 +171,7 @@ func newNacosMetadataReport(f fields) *nacosMetadataReport { } func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) { - mi := info.MetadataInfo{ + mi := &info.MetadataInfo{ App: "GetAppMetadata", } data, _ := json.Marshal(mi) @@ -198,7 +198,7 @@ func Test_nacosMetadataReport_GetAppMetadata(t *testing.T) { application: "dubbo", revision: "revision", }, - want: &mi, + want: mi, wantErr: false, }, } diff --git a/metadata/report_instance.go b/metadata/report_instance.go index 55cf516df6..4d2354b877 100644 --- a/metadata/report_instance.go +++ b/metadata/report_instance.go @@ -19,6 +19,7 @@ package metadata import ( "sort" + "sync" "time" ) @@ -39,7 +40,8 @@ import ( ) var ( - instances = make(map[string]report.MetadataReport) + instances = make(map[string]report.MetadataReport) + instancesMu sync.RWMutex ) // ClearMetadataReportInstances resets the package-level instances map. @@ -54,7 +56,10 @@ func addMetadataReport(registryId string, url *common.URL) error { logger.Warnf("[Metadata] no metadata report factory of protocol %s found, please check if the metadata report factory is imported", url.Protocol) return nil } - instances[registryId] = &DelegateMetadataReport{instance: fac.CreateMetadataReport(url)} + mr := &DelegateMetadataReport{instance: fac.CreateMetadataReport(url)} + instancesMu.Lock() + instances[registryId] = mr + instancesMu.Unlock() return nil } @@ -63,6 +68,9 @@ func addMetadataReport(registryId string, url *common.URL) error { // it falls back to the lexicographically first registry id so the selection // is always stable across calls. func GetMetadataReport() report.MetadataReport { + instancesMu.RLock() + defer instancesMu.RUnlock() + if r, ok := instances[constant.DefaultKey]; ok { return r } @@ -89,6 +97,8 @@ func GetMetadataReportByRegistry(registry string) report.MetadataReport { if len(registry) == 0 { return GetMetadataReport() } + instancesMu.RLock() + defer instancesMu.RUnlock() if r, ok := instances[registry]; ok { return r } @@ -101,6 +111,8 @@ func GetMetadataReportByRegistry(registry string) report.MetadataReport { } func GetMetadataReports() []report.MetadataReport { + instancesMu.RLock() + defer instancesMu.RUnlock() reports := make([]report.MetadataReport, len(instances)) index := 0 for _, r := range instances { diff --git a/registry/servicediscovery/service_instances_changed_listener_impl.go b/registry/servicediscovery/service_instances_changed_listener_impl.go index 317cff056f..2933d3a234 100644 --- a/registry/servicediscovery/service_instances_changed_listener_impl.go +++ b/registry/servicediscovery/service_instances_changed_listener_impl.go @@ -145,7 +145,7 @@ func (lstn *ServiceInstancesChangedListenerImpl) OnEvent(e observer.Event) error continue } instance.SetServiceMetadata(metadataInfo) - for _, service := range metadataInfo.Services { + for _, service := range metadataInfo.GetServices() { matchKey := service.GetMatchKey() if serviceToRevisionServices[matchKey] == nil { serviceToRevisionServices[matchKey] = make(map[string]*info.ServiceInfo) @@ -215,8 +215,11 @@ func toInstanceServiceURLs(instance registry.ServiceInstance, serviceInfo *info. // AddListenerAndNotify add notify listener and notify to listen service event func (lstn *ServiceInstancesChangedListenerImpl) AddListenerAndNotify(serviceKey string, notify registry.NotifyListener) { + lstn.mutex.Lock() lstn.listeners[serviceKey] = notify urls := lstn.serviceUrls[serviceKey] + lstn.mutex.Unlock() + for _, url := range urls { notify.Notify(®istry.ServiceEvent{ Action: remoting.EventTypeAdd, @@ -227,6 +230,8 @@ func (lstn *ServiceInstancesChangedListenerImpl) AddListenerAndNotify(serviceKey // RemoveListener remove notify listener func (lstn *ServiceInstancesChangedListenerImpl) RemoveListener(serviceKey string) { + lstn.mutex.Lock() + defer lstn.mutex.Unlock() delete(lstn.listeners, serviceKey) } diff --git a/registry/servicediscovery/service_instances_changed_listener_impl_test.go b/registry/servicediscovery/service_instances_changed_listener_impl_test.go index 22f7582ae4..b0e3fb5b73 100644 --- a/registry/servicediscovery/service_instances_changed_listener_impl_test.go +++ b/registry/servicediscovery/service_instances_changed_listener_impl_test.go @@ -375,6 +375,18 @@ func (c *capturingNotifyListener) NotifyAll(events []*registry.ServiceEvent, cal } } +func TestServiceInstancesChangedListenerRemoveListener(t *testing.T) { + listener := NewServiceInstancesChangedListener(testApp, constant.DefaultKey, gxset.NewSet(testApp)).(*ServiceInstancesChangedListenerImpl) + notify := &capturingNotifyListener{} + key := common.MatchKey(testInterface, constant.TriProtocol) + + listener.AddListenerAndNotify(key, notify) + require.Len(t, listener.listeners, 1) + + listener.RemoveListener(key) + require.Empty(t, listener.listeners) +} + func TestGetMetadataInfo_CacheKeyFormat(t *testing.T) { // Ensure cache is initialized (normally done by NewServiceInstancesChangedListener) _ = NewServiceInstancesChangedListener(testApp, constant.DefaultKey, gxset.NewSet(testApp))