diff --git a/metadata/mapping/metadata/service_name_mapping.go b/metadata/mapping/metadata/service_name_mapping.go index 1346436b2c..717864da5f 100644 --- a/metadata/mapping/metadata/service_name_mapping.go +++ b/metadata/mapping/metadata/service_name_mapping.go @@ -18,7 +18,9 @@ package metadata import ( + "errors" "sync" + "time" ) import ( @@ -33,11 +35,17 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/metadata" "dubbo.apache.org/dubbo-go/v3/metadata/mapping" + "dubbo.apache.org/dubbo-go/v3/metadata/report" ) -const ( - DefaultGroup = "mapping" - retryTimes = 10 +const DefaultGroup = "mapping" + +// retry policy for mapping registration. These are vars rather than consts so they can be +// tuned (and made near-instant in tests). +var ( + retryTimes = 10 + retryBaseInterval = 100 * time.Millisecond + retryMaxInterval = 2 * time.Second ) func init() { @@ -73,19 +81,41 @@ func (d *ServiceNameMapping) Map(url *common.URL) error { return perrors.New("can not registering mapping to remote cause no metadata report instance found") } for _, metadataReport := range metadataReports { - var err error - for i := 0; i < retryTimes; i++ { - if err = metadataReport.RegisterServiceAppMapping(serviceInterface, DefaultGroup, appName); err == nil { - break - } - } - if err != nil { + if err := registerWithRetry(metadataReport, serviceInterface, DefaultGroup, appName); err != nil { return err } } return nil } +// registerWithRetry registers the interface-to-app mapping, retrying only on CAS conflicts +// (report.ErrMappingCASConflict) with exponential backoff. Any other error is returned +// immediately, since retrying it would not help. +func registerWithRetry(r report.MetadataReport, serviceInterface, group, appName string) error { + var err error + for i := 0; i < retryTimes; i++ { + err = r.RegisterServiceAppMapping(serviceInterface, group, appName) + if err == nil { + return nil + } + if !errors.Is(err, report.ErrMappingCASConflict) { + return err + } + time.Sleep(backoff(i)) + } + return err +} + +// backoff returns the delay before retry attempt i: retryBaseInterval*2^i capped at +// retryMaxInterval. +func backoff(attempt int) time.Duration { + d := retryBaseInterval << attempt + if d <= 0 || d > retryMaxInterval { + d = retryMaxInterval + } + return d +} + // Get will return the application-level services. If not found, the empty set will be returned. func (d *ServiceNameMapping) Get(url *common.URL, listener mapping.MappingListener) (*gxset.HashSet, error) { serviceInterface := url.GetParam(constant.InterfaceKey, "") diff --git a/metadata/mapping/metadata/service_name_mapping_concurrency_test.go b/metadata/mapping/metadata/service_name_mapping_concurrency_test.go new file mode 100644 index 0000000000..578bc828b6 --- /dev/null +++ b/metadata/mapping/metadata/service_name_mapping_concurrency_test.go @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metadata + +import ( + "fmt" + "sync" + "testing" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" + + "github.com/stretchr/testify/assert" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/metadata/info" + "dubbo.apache.org/dubbo-go/v3/metadata/mapping" + "dubbo.apache.org/dubbo-go/v3/metadata/report" +) + +// versionedStore is an in-memory key/value store with a per-key version, modeling the +// compare-and-swap primitive a real metadata center (etcd ModRevision, zk Stat.Version, +// nacos content MD5) provides. +type versionedStore struct { + mu sync.Mutex + data map[string]versionedEntry +} + +type versionedEntry struct { + val string + ver int64 +} + +func newVersionedStore() *versionedStore { + return &versionedStore{data: make(map[string]versionedEntry)} +} + +func (s *versionedStore) get(key string) (string, int64) { + s.mu.Lock() + defer s.mu.Unlock() + e := s.data[key] + return e.val, e.ver +} + +// cas writes val only if the current version equals ver, returning whether it was applied. +func (s *versionedStore) cas(key, val string, ver int64) bool { + s.mu.Lock() + defer s.mu.Unlock() + if s.data[key].ver != ver { + return false + } + s.data[key] = versionedEntry{val: val, ver: ver + 1} + return true +} + +// put writes unconditionally, modeling the old read-modify-write behavior without CAS. +func (s *versionedStore) put(key, val string) { + s.mu.Lock() + defer s.mu.Unlock() + s.data[key] = versionedEntry{val: val, ver: s.data[key].ver + 1} +} + +// stubReport satisfies the non-mapping parts of report.MetadataReport. +type stubReport struct{} + +func (stubReport) GetAppMetadata(string, string) (*info.MetadataInfo, error) { return nil, nil } +func (stubReport) PublishAppMetadata(string, string, *info.MetadataInfo) error { return nil } +func (stubReport) GetServiceAppMapping(string, string, mapping.MappingListener) (*gxset.HashSet, error) { + return nil, nil +} +func (stubReport) RemoveServiceAppMappingListener(string, string) error { return nil } + +// casReport registers mappings with optimistic concurrency against a versionedStore, exactly +// as the etcd/zk/nacos reports now do, returning report.ErrMappingCASConflict on conflict. +type casReport struct { + stubReport + store *versionedStore +} + +func (r *casReport) RegisterServiceAppMapping(key, group, value string) error { + full := group + "/" + key + old, ver := r.store.get(full) + merged, changed := report.MergeServiceAppMapping(old, value) + if !changed { + return nil + } + if !r.store.cas(full, merged, ver) { + return report.ErrMappingCASConflict + } + return nil +} + +func (r *casReport) GetServiceAppMapping(key, group string, _ mapping.MappingListener) (*gxset.HashSet, error) { + val, _ := r.store.get(group + "/" + key) + return report.DecodeServiceAppNames(val), nil +} + +// fastRetry sets a near-zero backoff and the given retry budget, returning a restore func. +func fastRetry(times int) func() { + ot, ob, om := retryTimes, retryBaseInterval, retryMaxInterval + retryTimes, retryBaseInterval, retryMaxInterval = times, 0, 0 + return func() { retryTimes, retryBaseInterval, retryMaxInterval = ot, ob, om } +} + +// TestNaiveReadModifyWriteLosesConcurrentUpdate reproduces the bug the issue is about: two +// providers that read the same value and write back unconditionally clobber each other. +func TestNaiveReadModifyWriteLosesConcurrentUpdate(t *testing.T) { + store := newVersionedStore() + const key = "mapping/Iface" + + // Both providers read the initial empty value... + oldA, _ := store.get(key) + oldB, _ := store.get(key) + // ...each merges its own app and writes back without a version check. + mergedA, _ := report.MergeServiceAppMapping(oldA, "appA") + store.put(key, mergedA) + mergedB, _ := report.MergeServiceAppMapping(oldB, "appB") + store.put(key, mergedB) + + val, _ := store.get(key) + got := report.DecodeServiceAppNames(val) + assert.False(t, got.Contains("appA"), "appA was silently lost by the second write") + assert.True(t, got.Contains("appB")) +} + +// TestCASRejectsConcurrentUpdate shows the fix: the second writer's CAS fails, and after a +// re-read both apps survive. +func TestCASRejectsConcurrentUpdate(t *testing.T) { + store := newVersionedStore() + const key = "mapping/Iface" + + oldA, verA := store.get(key) + oldB, verB := store.get(key) // both read version 0 + + mergedA, _ := report.MergeServiceAppMapping(oldA, "appA") + assert.True(t, store.cas(key, mergedA, verA)) // A wins, version -> 1 + + mergedB, _ := report.MergeServiceAppMapping(oldB, "appB") + assert.False(t, store.cas(key, mergedB, verB)) // B's stale version is rejected + + // B re-reads and retries; nothing is lost. + oldB2, verB2 := store.get(key) + mergedB2, _ := report.MergeServiceAppMapping(oldB2, "appB") + assert.True(t, store.cas(key, mergedB2, verB2)) + + val, _ := store.get(key) + got := report.DecodeServiceAppNames(val) + assert.True(t, got.Contains("appA")) + assert.True(t, got.Contains("appB")) +} + +// TestRegisterWithRetryConcurrentNoLostUpdate drives the real registerWithRetry loop with many +// writers racing on the same interface key, while readers concurrently read it. It asserts no +// app is lost and that a reader never observes the set shrink. Run with -race to also catch +// data races. +func TestRegisterWithRetryConcurrentNoLostUpdate(t *testing.T) { + defer fastRetry(10000)() + store := newVersionedStore() + r := &casReport{store: store} + + const writers = 200 + const readers = 20 + + // concurrent readers: every successful registration only appends, so a reader must never + // see the set get smaller or contain a malformed entry. + stop := make(chan struct{}) + var readerWg sync.WaitGroup + for i := 0; i < readers; i++ { + readerWg.Add(1) + go func() { + defer readerWg.Done() + prev := 0 + for { + select { + case <-stop: + return + default: + set, err := r.GetServiceAppMapping("Iface", DefaultGroup, nil) + assert.NoError(t, err) + assert.GreaterOrEqual(t, set.Size(), prev) + assert.False(t, set.Contains("")) + prev = set.Size() + } + } + }() + } + + var writerWg sync.WaitGroup + for i := 0; i < writers; i++ { + writerWg.Add(1) + go func(i int) { + defer writerWg.Done() + assert.NoError(t, registerWithRetry(r, "Iface", DefaultGroup, fmt.Sprintf("app-%d", i))) + }(i) + } + writerWg.Wait() + close(stop) + readerWg.Wait() + + val, _ := store.get(DefaultGroup + "/Iface") + got := report.DecodeServiceAppNames(val) + assert.Equal(t, writers, got.Size()) + for i := 0; i < writers; i++ { + assert.True(t, got.Contains(fmt.Sprintf("app-%d", i)), "app-%d was lost", i) + } +} diff --git a/metadata/mapping/metadata/service_name_mapping_test.go b/metadata/mapping/metadata/service_name_mapping_test.go index d1783289ae..c279ff4c07 100644 --- a/metadata/mapping/metadata/service_name_mapping_test.go +++ b/metadata/mapping/metadata/service_name_mapping_test.go @@ -97,11 +97,19 @@ func TestServiceNameMappingMap(t *testing.T) { err = ins.Map(serviceUrl) require.NoError(t, err) }) - t.Run("test error", func(t *testing.T) { - mockReport.On("RegisterServiceAppMapping").Return(errors.New("mock error")).Times(retryTimes) + t.Run("non-conflict error returns immediately", func(t *testing.T) { + // a generic error is not retriable, so RegisterServiceAppMapping is called exactly once + mockReport.On("RegisterServiceAppMapping").Return(errors.New("mock error")).Once() err = ins.Map(serviceUrl) require.Error(t, err, "test mapping error") }) + t.Run("CAS conflict retries up to retryTimes", func(t *testing.T) { + const conflictRetries = 3 + defer fastRetry(conflictRetries)() + mockReport.On("RegisterServiceAppMapping").Return(report.ErrMappingCASConflict).Times(conflictRetries) + err = ins.Map(serviceUrl) + require.Error(t, err, "conflict exhausts the retry budget") + }) mockReport.AssertExpectations(t) } diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index a69cdcb768..182b18898d 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -19,6 +19,7 @@ package etcd import ( "encoding/json" + "fmt" "strings" ) @@ -79,34 +80,47 @@ func (e *etcdMetadataReport) PublishAppMetadata(application, revision string, in // RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name func (e *etcdMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error { path := e.rootDir + constant.PathSeparator + group + constant.PathSeparator + key - oldVal, err := e.client.Get(path) + oldVal, rev, err := e.client.GetValAndRev(path) if perrors.Cause(err) == gxetcd.ErrKVPairNotFound { - return e.client.Put(path, value) + if cErr := e.client.Create(path, value); cErr != nil { + if perrors.Cause(cErr) == gxetcd.ErrCompareFail { + return fmt.Errorf("create mapping %s: %w", path, report.ErrMappingCASConflict) + } + return cErr + } + return nil } else if err != nil { return err } - if strings.Contains(oldVal, value) { + merged, changed := report.MergeServiceAppMapping(oldVal, value) + if !changed { return nil } - value = oldVal + constant.CommaSeparator + value - return e.client.Put(path, value) + if uErr := e.client.UpdateWithRev(path, merged, rev); uErr != nil { + if perrors.Cause(uErr) == gxetcd.ErrCompareFail { + return fmt.Errorf("update mapping %s: %w", path, report.ErrMappingCASConflict) + } + return uErr + } + return nil } // GetServiceAppMapping get the app names from the specified Dubbo service interface func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string, listener mapping.MappingListener) (*gxset.HashSet, error) { path := e.rootDir + constant.PathSeparator + group + constant.PathSeparator + key + if listener != nil { + logger.Warnf("etcd metadata report does not support service mapping listener, "+ + "mapping changes of %s will not be notified", path) + } v, err := e.client.Get(path) if err != nil { return nil, err } - appNames := strings.Split(v, constant.CommaSeparator) - set := gxset.NewSet() - for _, app := range appNames { - set.Add(app) - } - return set, nil + return report.DecodeServiceAppNames(v), nil } +// RemoveServiceAppMappingListener is a no-op: etcd metadata report does not register a mapping +// listener (see GetServiceAppMapping), so there is nothing to remove. func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group string) error { return nil } diff --git a/metadata/report/mapping.go b/metadata/report/mapping.go new file mode 100644 index 0000000000..2f9c7952dc --- /dev/null +++ b/metadata/report/mapping.go @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package report + +import ( + "errors" + "strings" +) + +import ( + gxset "github.com/dubbogo/gost/container/set" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common/constant" +) + +// ErrMappingCASConflict is returned by MetadataReport.RegisterServiceAppMapping when an +// optimistic-concurrency (compare-and-swap) write loses a race against a concurrent writer. +// +// It is a *retriable* error: the caller should re-read the latest value, re-merge its +// application name, and try again. Each backend wraps its native conflict error +// (etcd ErrCompareFail, ZooKeeper ErrBadVersion / ErrNodeExists, Nacos CAS publish failure) +// with this sentinel using fmt.Errorf("...: %w", ...), so the upper layer can classify +// retriable vs. permanent failures with errors.Is and only burn its retry budget on the +// former. +var ErrMappingCASConflict = errors.New("service-app mapping CAS conflict") + +// MergeServiceAppMapping merges application into oldVal, the comma-separated set of +// application names stored under a single interface key in the metadata center. +// +// It compares whole elements rather than substrings. The previous implementations used +// strings.Contains(oldVal, application), which produced false positives — e.g. registering +// "order" was wrongly treated as already-present when the set contained "order-service", +// so "order" could never be written. It also never emits an empty element, fixing the +// leading-comma bug ("" + "," + app => ",app") triggered when oldVal is empty. +// +// It returns the merged value and whether oldVal changed. changed == false means application +// was already present and no write (hence no CAS round-trip) is required. +// +// New names are appended to the end rather than rebuilt from a set: this keeps existing +// ordering and makes the written bytes change minimally and deterministically, which is +// what optimistic concurrency relies on. Rebuilding from a Go map/set would reorder the +// value nondeterministically, producing spurious CAS conflicts and MD5 churn even when the +// logical content is unchanged. +func MergeServiceAppMapping(oldVal, application string) (string, bool) { + if oldVal == "" { + return application, true + } + for _, app := range strings.Split(oldVal, constant.CommaSeparator) { + if app == application { + return oldVal, false + } + } + return oldVal + constant.CommaSeparator + application, true +} + +// DecodeServiceAppNames parses the comma-separated application set stored under an interface +// key into a HashSet, skipping empty elements. A blank value yields an empty set rather than a +// set containing one empty string, which strings.Split("", ",") would otherwise produce. +func DecodeServiceAppNames(val string) *gxset.HashSet { + set := gxset.NewSet() + if val == "" { + return set + } + for _, app := range strings.Split(val, constant.CommaSeparator) { + if app != "" { + set.Add(app) + } + } + return set +} diff --git a/metadata/report/mapping_test.go b/metadata/report/mapping_test.go new file mode 100644 index 0000000000..cb40e1b731 --- /dev/null +++ b/metadata/report/mapping_test.go @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package report + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +func TestMergeServiceAppMapping(t *testing.T) { + tests := []struct { + name string + oldVal string + application string + wantVal string + wantChanged bool + }{ + {"empty old value", "", "appA", "appA", true}, + {"append new app", "appA", "appB", "appA,appB", true}, + {"already present", "appA,appB", "appA", "appA,appB", false}, + {"already present last", "appA,appB", "appB", "appA,appB", false}, + // regression: strings.Contains would treat "order" as already present in + // "order-service" and never write it. Whole-element matching must not. + {"substring is not membership", "order-service", "order", "order-service,order", true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, changed := MergeServiceAppMapping(tt.oldVal, tt.application) + assert.Equal(t, tt.wantVal, got) + assert.Equal(t, tt.wantChanged, changed) + }) + } +} + +func TestDecodeServiceAppNames(t *testing.T) { + // blank value yields an empty set, not a set containing one empty string. + assert.True(t, DecodeServiceAppNames("").Empty()) + + set := DecodeServiceAppNames("appA,appB") + assert.Equal(t, 2, set.Size()) + assert.True(t, set.Contains("appA")) + assert.True(t, set.Contains("appB")) + + // empty elements from stray separators are skipped. + set = DecodeServiceAppNames("appA,,appB,") + assert.Equal(t, 2, set.Size()) + assert.False(t, set.Contains("")) +} diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index a1772d5cae..fa77626ae0 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -18,7 +18,9 @@ package nacos import ( + "crypto/md5" "encoding/json" + "fmt" "strings" ) @@ -141,12 +143,15 @@ func (n *nacosMetadataReport) addListener(key string, group string, notify mappi }) } +// isConfigNotExistErr reports whether err is Nacos's "config data not exist" signal, which it +// returns (rather than an empty value) for a key that has never been written. It must be treated +// as an empty old value so the first registration can create the mapping. +func isConfigNotExistErr(err error) bool { + return err != nil && strings.Contains(strings.ToLower(err.Error()), "config data not exist") +} + func callback(notify mapping.MappingListener, dataId, data string) { - appNames := strings.Split(data, constant.CommaSeparator) - set := gxset.NewSet() - for _, app := range appNames { - set.Add(app) - } + set := report.DecodeServiceAppNames(data) if err := notify.OnEvent(registry.NewServiceMappingChangedEvent(dataId, set)); err != nil { logger.Errorf("[Metadata][Nacos] serviceMapping callback err=%v", err) } @@ -161,26 +166,49 @@ func (n *nacosMetadataReport) removeServiceMappingListener(key string, group str // RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name func (n *nacosMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error { - oldVal, _ := n.getConfig(vo.ConfigParam{ + oldVal, err := n.getConfig(vo.ConfigParam{ DataId: key, Group: group, }) - if oldVal != "" { - oldApps := strings.Split(oldVal, constant.CommaSeparator) - if len(oldApps) > 0 { - for _, app := range oldApps { - if app == value { - return nil - } - } - } - value = oldVal + constant.CommaSeparator + value + if err != nil && !isConfigNotExistErr(err) { + // A real read failure (network/auth/server): do not treat it as an empty value, or we + // would publish only our app and overwrite an existing set. "config data not exist" is + // Nacos's not-found signal, which is fine here and proceeds to the first write. + return err } - return n.storeMetadata(vo.ConfigParam{ + merged, changed := report.MergeServiceAppMapping(oldVal, value) + if !changed { + return nil + } + param := vo.ConfigParam{ DataId: key, Group: group, - Content: value, - }) + Content: merged, + } + if oldVal != "" { + // CasMd5 is an optimistic UPDATE: Nacos publishes only if the server content still + // matches what we read, detecting concurrent appends. It cannot guard the first INSERT + // (Nacos has no create-if-absent), so the initial concurrent registration of a + // brand-new interface can still race. This is a known Nacos-only limitation; the + // etcd and zookeeper reports do not have it. + // + // The MD5 here is not a security mechanism: it is the checksum the Nacos CAS wire + // protocol requires (PublishConfig forwards CasMd5 to the server, which compares it + // against the stored content's MD5). The algorithm is dictated by Nacos, not chosen + // by us, so the weak-hash warning does not apply. NOSONAR + param.CasMd5 = fmt.Sprintf("%x", md5.Sum([]byte(oldVal))) // NOSONAR: Nacos CAS protocol checksum, not security + } + if err := n.storeMetadata(param); err != nil { + if param.CasMd5 != "" { + // Nacos surfaces a CAS rejection and a transport error the same way, so they + // cannot be told apart here. Treat the failure as a retriable conflict rather + // than risk dropping a real concurrent update; the underlying error is preserved + // for diagnosis. + return fmt.Errorf("publish mapping %s (%v): %w", key, err, report.ErrMappingCASConflict) + } + return err + } + return nil } // GetServiceAppMapping get the app names from the specified Dubbo service interface @@ -201,12 +229,7 @@ func (n *nacosMetadataReport) GetServiceAppMapping(key string, group string, lis if v == "" { return nil, perrors.New("There is no service app mapping data.") } - appNames := strings.Split(v, constant.CommaSeparator) - set := gxset.NewSet() - for _, e := range appNames { - set.Add(e) - } - return set, nil + return report.DecodeServiceAppNames(v), nil } // RemoveServiceAppMappingListener remove the serviceMapping listener from metadata center diff --git a/metadata/report/zookeeper/listener.go b/metadata/report/zookeeper/listener.go index 97fb159874..e2f7dc5d29 100644 --- a/metadata/report/zookeeper/listener.go +++ b/metadata/report/zookeeper/listener.go @@ -23,13 +23,13 @@ import ( ) import ( - gxset "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/log/logger" ) import ( "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metadata/mapping" + "dubbo.apache.org/dubbo-go/v3/metadata/report" "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper" @@ -116,14 +116,19 @@ func (l *CacheListener) RemoveListener(key string, listener mapping.MappingListe } } +// RemoveKeyListeners drops all listeners registered for key so its mapping change events stop +// being dispatched. The dispatcher goroutine is shared by the whole mapping group and is kept +// alive (other keys still need it); it is released when the report is closed. The key's +// underlying ZooKeeper watch is not unregistered here, as ZkEventListener exposes no per-path +// unlisten, so the server may keep sending now-ignored events for the key. +func (l *CacheListener) RemoveKeyListeners(key string) { + l.keyListeners.Delete(key) +} + // DataChange changes all listeners' event func (l *CacheListener) DataChange(event remoting.Event) bool { if listeners, ok := l.keyListeners.Load(event.Path); ok { - appNames := strings.Split(event.Content, constant.CommaSeparator) - set := gxset.NewSet() - for _, e := range appNames { - set.Add(e) - } + set := report.DecodeServiceAppNames(event.Content) err := listeners.(*ListenerSet).ForEach(func(listener mapping.MappingListener) error { return listener.OnEvent(registry.NewServiceMappingChangedEvent(l.pathToKey(event.Path), set)) }) diff --git a/metadata/report/zookeeper/listener_test.go b/metadata/report/zookeeper/listener_test.go index 8d28269ab7..23d3d7bcc6 100644 --- a/metadata/report/zookeeper/listener_test.go +++ b/metadata/report/zookeeper/listener_test.go @@ -26,6 +26,7 @@ import ( ) import ( + gxset "github.com/dubbogo/gost/container/set" "github.com/dubbogo/gost/gof/observer" "github.com/stretchr/testify/assert" @@ -36,6 +37,7 @@ import ( import ( "dubbo.apache.org/dubbo-go/v3/metadata/mapping" + "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" ) @@ -43,6 +45,7 @@ type mockMappingListener struct { eventCount atomic.Int32 mu sync.Mutex onEventErr error + lastNames *gxset.HashSet } func newMockMappingListener() *mockMappingListener { return &mockMappingListener{} } @@ -51,6 +54,9 @@ func (m *mockMappingListener) OnEvent(e observer.Event) error { m.mu.Lock() defer m.mu.Unlock() m.eventCount.Add(1) + if sm, ok := e.(*registry.ServiceMappingChangeEvent); ok { + m.lastNames = sm.GetServiceNames() + } return m.onEventErr } @@ -164,3 +170,23 @@ func TestCacheListenerPathToKey(t *testing.T) { assert.Equal(t, "com.example.Service", cl.pathToKey("/dubbo/mapping/com.example.Service")) assert.Empty(t, cl.pathToKey("")) } + +// TestCacheListenerDataChangeFiltersEmptyAppNames ensures a change event built from a +// malformed/legacy comma-separated value (stray separators) does not surface empty app names +// to the listener. +func TestCacheListenerDataChangeFiltersEmptyAppNames(t *testing.T) { + cl := NewCacheListener("/dubbo", nil) + key := "/dubbo/mapping/com.example.Service" + + ml := newMockMappingListener() + listenerSet := NewListenerSet() + listenerSet.Add(ml) + cl.keyListeners.Store(key, listenerSet) + + assert.True(t, cl.DataChange(remoting.Event{Path: key, Action: remoting.EventTypeUpdate, Content: ",appA,,appB,"})) + require.NotNil(t, ml.lastNames) + assert.Equal(t, 2, ml.lastNames.Size()) + assert.True(t, ml.lastNames.Contains("appA")) + assert.True(t, ml.lastNames.Contains("appB")) + assert.False(t, ml.lastNames.Contains("")) +} diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go index 5772678478..5e3b2d2e4e 100644 --- a/metadata/report/zookeeper/report.go +++ b/metadata/report/zookeeper/report.go @@ -19,6 +19,7 @@ package zookeeper import ( "encoding/json" + "fmt" "strings" ) @@ -94,17 +95,27 @@ func (m *zookeeperMetadataReport) RegisterServiceAppMapping(key string, group st path := m.rootDir + group + constant.PathSeparator + key v, state, err := m.client.GetContent(path) if err == zk.ErrNoNode { - return m.client.CreateWithValue(path, []byte(value)) + if cErr := m.client.CreateWithValue(path, []byte(value)); cErr != nil { + if perrors.Is(cErr, zk.ErrNodeExists) { + return fmt.Errorf("create mapping %s: %w", path, report.ErrMappingCASConflict) + } + return cErr + } + return nil } else if err != nil { return err } - oldValue := string(v) - if strings.Contains(oldValue, value) { + merged, changed := report.MergeServiceAppMapping(string(v), value) + if !changed { return nil } - value = oldValue + constant.CommaSeparator + value - _, err = m.client.SetContent(path, []byte(value), state.Version) - return err + if _, sErr := m.client.SetContent(path, []byte(merged), state.Version); sErr != nil { + if perrors.Is(sErr, zk.ErrBadVersion) { + return fmt.Errorf("update mapping %s: %w", path, report.ErrMappingCASConflict) + } + return sErr + } + return nil } // GetServiceAppMapping get the app names from the specified Dubbo service interface @@ -120,15 +131,12 @@ func (m *zookeeperMetadataReport) GetServiceAppMapping(key string, group string, if err != nil { return nil, err } - appNames := strings.Split(string(v), constant.CommaSeparator) - set := gxset.NewSet() - for _, e := range appNames { - set.Add(e) - } - return set, nil + return report.DecodeServiceAppNames(string(v)), nil } func (m *zookeeperMetadataReport) RemoveServiceAppMappingListener(key string, group string) error { + path := m.rootDir + group + constant.PathSeparator + key + m.cacheListener.RemoveKeyListeners(path) return nil } diff --git a/metadata/report/zookeeper/report_test.go b/metadata/report/zookeeper/report_test.go index 8bbb47f5a9..d17db0eefc 100644 --- a/metadata/report/zookeeper/report_test.go +++ b/metadata/report/zookeeper/report_test.go @@ -19,7 +19,6 @@ package zookeeper import ( "encoding/json" - "strings" "testing" ) @@ -32,6 +31,7 @@ import ( "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/common/constant" "dubbo.apache.org/dubbo-go/v3/metadata/info" + "dubbo.apache.org/dubbo-go/v3/metadata/report" ) func TestMetadataInfoSerialization(t *testing.T) { @@ -62,19 +62,19 @@ func TestMetadataInfoSerialization(t *testing.T) { func TestRegisterServiceAppMappingValueMerge(t *testing.T) { tests := []struct { oldValue, newValue, expected string + wantChanged bool }{ - {"app1", "app2", "app1,app2"}, - {"app1,app2", "app1", "app1,app2"}, - {"", "app1", ",app1"}, + {"app1", "app2", "app1,app2", true}, + {"app1,app2", "app1", "app1,app2", false}, + // empty old value must not produce a leading comma (was ",app1") + {"", "app1", "app1", true}, + // substring must not be mistaken for membership (was wrongly treated as present) + {"app1-extra", "app1", "app1-extra,app1", true}, } for _, tt := range tests { - var result string - if strings.Contains(tt.oldValue, tt.newValue) { - result = tt.oldValue - } else { - result = tt.oldValue + "," + tt.newValue - } + result, changed := report.MergeServiceAppMapping(tt.oldValue, tt.newValue) assert.Equal(t, tt.expected, result) + assert.Equal(t, tt.wantChanged, changed) } }