Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions metadata/mapping/metadata/service_name_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package metadata

import (
"errors"
"math/rand"
"sync"
"time"
)

import (
Expand All @@ -33,11 +36,17 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metadata"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
)

const (
DefaultGroup = "mapping"
retryTimes = 10
const DefaultGroup = "mapping"

// retry policy for mapping registration. These are vars rather than consts so they can be
// tuned (and made near-instant in tests).
var (
retryTimes = 10
retryBaseInterval = 100 * time.Millisecond
retryMaxInterval = 2 * time.Second
)

func init() {
Expand Down Expand Up @@ -73,19 +82,42 @@ func (d *ServiceNameMapping) Map(url *common.URL) error {
return perrors.New("can not registering mapping to remote cause no metadata report instance found")
}
for _, metadataReport := range metadataReports {
var err error
for i := 0; i < retryTimes; i++ {
if err = metadataReport.RegisterServiceAppMapping(serviceInterface, DefaultGroup, appName); err == nil {
break
}
}
if err != nil {
if err := registerWithRetry(metadataReport, serviceInterface, DefaultGroup, appName); err != nil {
return err
}
}
return nil
}

// registerWithRetry registers the interface-to-app mapping, retrying only on CAS conflicts
// (report.ErrMappingCASConflict) with exponential backoff. Any other error is returned
// immediately, since retrying it would not help.
func registerWithRetry(r report.MetadataReport, serviceInterface, group, appName string) error {
var err error
for i := 0; i < retryTimes; i++ {
err = r.RegisterServiceAppMapping(serviceInterface, group, appName)
if err == nil {
return nil
}
if !errors.Is(err, report.ErrMappingCASConflict) {
return err
}
time.Sleep(backoff(i))
}
return err
}

// backoff returns the delay before retry attempt i: retryBaseInterval*2^i capped at
// retryMaxInterval, plus up to 50% random jitter to spread out writers contending on the
// same key.
func backoff(attempt int) time.Duration {
d := retryBaseInterval << attempt
if d <= 0 || d > retryMaxInterval {
d = retryMaxInterval
}
return d/2 + time.Duration(rand.Int63n(int64(d)/2+1))
}

// Get will return the application-level services. If not found, the empty set will be returned.
func (d *ServiceNameMapping) Get(url *common.URL, listener mapping.MappingListener) (*gxset.HashSet, error) {
serviceInterface := url.GetParam(constant.InterfaceKey, "")
Expand Down
223 changes: 223 additions & 0 deletions metadata/mapping/metadata/service_name_mapping_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metadata

import (
"fmt"
"sync"
"testing"
)
Comment on lines +20 to +24

import (
gxset "github.com/dubbogo/gost/container/set"

"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/metadata/info"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
)

// versionedStore is an in-memory key/value store with a per-key version, modeling the
// compare-and-swap primitive a real metadata center (etcd ModRevision, zk Stat.Version,
// nacos content MD5) provides.
type versionedStore struct {
mu sync.Mutex
data map[string]versionedEntry
}

type versionedEntry struct {
val string
ver int64
}

func newVersionedStore() *versionedStore {
return &versionedStore{data: make(map[string]versionedEntry)}
}

func (s *versionedStore) get(key string) (string, int64) {
s.mu.Lock()
defer s.mu.Unlock()
e := s.data[key]
return e.val, e.ver
}

// cas writes val only if the current version equals ver, returning whether it was applied.
func (s *versionedStore) cas(key, val string, ver int64) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.data[key].ver != ver {
return false
}
s.data[key] = versionedEntry{val: val, ver: ver + 1}
return true
}

// put writes unconditionally, modeling the old read-modify-write behavior without CAS.
func (s *versionedStore) put(key, val string) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[key] = versionedEntry{val: val, ver: s.data[key].ver + 1}
}

// stubReport satisfies the non-mapping parts of report.MetadataReport.
type stubReport struct{}

func (stubReport) GetAppMetadata(string, string) (*info.MetadataInfo, error) { return nil, nil }
func (stubReport) PublishAppMetadata(string, string, *info.MetadataInfo) error { return nil }
func (stubReport) GetServiceAppMapping(string, string, mapping.MappingListener) (*gxset.HashSet, error) {
return nil, nil
}
func (stubReport) RemoveServiceAppMappingListener(string, string) error { return nil }

// casReport registers mappings with optimistic concurrency against a versionedStore, exactly
// as the etcd/zk/nacos reports now do, returning report.ErrMappingCASConflict on conflict.
type casReport struct {
stubReport
store *versionedStore
}

func (r *casReport) RegisterServiceAppMapping(key, group, value string) error {
full := group + "/" + key
old, ver := r.store.get(full)
merged, changed := report.MergeServiceAppMapping(old, value)
if !changed {
return nil
}
if !r.store.cas(full, merged, ver) {
return report.ErrMappingCASConflict
}
return nil
}

func (r *casReport) GetServiceAppMapping(key, group string, _ mapping.MappingListener) (*gxset.HashSet, error) {
val, _ := r.store.get(group + "/" + key)
return report.DecodeServiceAppNames(val), nil
}

// fastRetry sets a near-zero backoff and the given retry budget, returning a restore func.
func fastRetry(times int) func() {
ot, ob, om := retryTimes, retryBaseInterval, retryMaxInterval
retryTimes, retryBaseInterval, retryMaxInterval = times, 0, 0
return func() { retryTimes, retryBaseInterval, retryMaxInterval = ot, ob, om }
}

// TestNaiveReadModifyWriteLosesConcurrentUpdate reproduces the bug the issue is about: two
// providers that read the same value and write back unconditionally clobber each other.
func TestNaiveReadModifyWriteLosesConcurrentUpdate(t *testing.T) {
store := newVersionedStore()
const key = "mapping/Iface"

// Both providers read the initial empty value...
oldA, _ := store.get(key)
oldB, _ := store.get(key)
// ...each merges its own app and writes back without a version check.
mergedA, _ := report.MergeServiceAppMapping(oldA, "appA")
store.put(key, mergedA)
mergedB, _ := report.MergeServiceAppMapping(oldB, "appB")
store.put(key, mergedB)

val, _ := store.get(key)
got := report.DecodeServiceAppNames(val)
assert.False(t, got.Contains("appA"), "appA was silently lost by the second write")
assert.True(t, got.Contains("appB"))
}

// TestCASRejectsConcurrentUpdate shows the fix: the second writer's CAS fails, and after a
// re-read both apps survive.
func TestCASRejectsConcurrentUpdate(t *testing.T) {
store := newVersionedStore()
const key = "mapping/Iface"

oldA, verA := store.get(key)
oldB, verB := store.get(key) // both read version 0

mergedA, _ := report.MergeServiceAppMapping(oldA, "appA")
assert.True(t, store.cas(key, mergedA, verA)) // A wins, version -> 1

mergedB, _ := report.MergeServiceAppMapping(oldB, "appB")
assert.False(t, store.cas(key, mergedB, verB)) // B's stale version is rejected

// B re-reads and retries; nothing is lost.
oldB2, verB2 := store.get(key)
mergedB2, _ := report.MergeServiceAppMapping(oldB2, "appB")
assert.True(t, store.cas(key, mergedB2, verB2))

val, _ := store.get(key)
got := report.DecodeServiceAppNames(val)
assert.True(t, got.Contains("appA"))
assert.True(t, got.Contains("appB"))
}

// TestRegisterWithRetryConcurrentNoLostUpdate drives the real registerWithRetry loop with many
// writers racing on the same interface key, while readers concurrently read it. It asserts no
// app is lost and that a reader never observes the set shrink. Run with -race to also catch
// data races.
func TestRegisterWithRetryConcurrentNoLostUpdate(t *testing.T) {
defer fastRetry(10000)()
store := newVersionedStore()
r := &casReport{store: store}

const writers = 200
const readers = 20

// concurrent readers: every successful registration only appends, so a reader must never
// see the set get smaller or contain a malformed entry.
stop := make(chan struct{})
var readerWg sync.WaitGroup
for i := 0; i < readers; i++ {
readerWg.Add(1)
go func() {
defer readerWg.Done()
prev := 0
for {
select {
case <-stop:
return
default:
set, err := r.GetServiceAppMapping("Iface", DefaultGroup, nil)
assert.NoError(t, err)
assert.GreaterOrEqual(t, set.Size(), prev)
assert.False(t, set.Contains(""))
prev = set.Size()
}
Comment on lines +194 to +200
}
}()
}

var writerWg sync.WaitGroup
for i := 0; i < writers; i++ {
writerWg.Add(1)
go func(i int) {
defer writerWg.Done()
assert.NoError(t, registerWithRetry(r, "Iface", DefaultGroup, fmt.Sprintf("app-%d", i)))
}(i)
}
writerWg.Wait()
close(stop)
readerWg.Wait()

val, _ := store.get(DefaultGroup + "/Iface")
got := report.DecodeServiceAppNames(val)
assert.Equal(t, writers, got.Size())
for i := 0; i < writers; i++ {
assert.True(t, got.Contains(fmt.Sprintf("app-%d", i)), "app-%d was lost", i)
}
}
12 changes: 10 additions & 2 deletions metadata/mapping/metadata/service_name_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,19 @@ func TestServiceNameMappingMap(t *testing.T) {
err = ins.Map(serviceUrl)
require.NoError(t, err)
})
t.Run("test error", func(t *testing.T) {
mockReport.On("RegisterServiceAppMapping").Return(errors.New("mock error")).Times(retryTimes)
t.Run("non-conflict error returns immediately", func(t *testing.T) {
// a generic error is not retriable, so RegisterServiceAppMapping is called exactly once
mockReport.On("RegisterServiceAppMapping").Return(errors.New("mock error")).Once()
err = ins.Map(serviceUrl)
require.Error(t, err, "test mapping error")
})
t.Run("CAS conflict retries up to retryTimes", func(t *testing.T) {
const conflictRetries = 3
defer fastRetry(conflictRetries)()
mockReport.On("RegisterServiceAppMapping").Return(report.ErrMappingCASConflict).Times(conflictRetries)
err = ins.Map(serviceUrl)
require.Error(t, err, "conflict exhausts the retry budget")
})
mockReport.AssertExpectations(t)
}

Expand Down
36 changes: 25 additions & 11 deletions metadata/report/etcd/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package etcd

import (
"encoding/json"
"fmt"
"strings"
)

Expand Down Expand Up @@ -79,34 +80,47 @@ func (e *etcdMetadataReport) PublishAppMetadata(application, revision string, in
// RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name
func (e *etcdMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error {
path := e.rootDir + constant.PathSeparator + group + constant.PathSeparator + key
oldVal, err := e.client.Get(path)
oldVal, rev, err := e.client.GetValAndRev(path)
if perrors.Cause(err) == gxetcd.ErrKVPairNotFound {
return e.client.Put(path, value)
if cErr := e.client.Create(path, value); cErr != nil {
if perrors.Cause(cErr) == gxetcd.ErrCompareFail {
return fmt.Errorf("create mapping %s: %w", path, report.ErrMappingCASConflict)
}
return cErr
}
return nil
} else if err != nil {
return err
}
if strings.Contains(oldVal, value) {
merged, changed := report.MergeServiceAppMapping(oldVal, value)
if !changed {
return nil
}
value = oldVal + constant.CommaSeparator + value
return e.client.Put(path, value)
if uErr := e.client.UpdateWithRev(path, merged, rev); uErr != nil {
if perrors.Cause(uErr) == gxetcd.ErrCompareFail {
return fmt.Errorf("update mapping %s: %w", path, report.ErrMappingCASConflict)
}
return uErr
}
return nil
}

// GetServiceAppMapping get the app names from the specified Dubbo service interface
func (e *etcdMetadataReport) GetServiceAppMapping(key string, group string, listener mapping.MappingListener) (*gxset.HashSet, error) {
path := e.rootDir + constant.PathSeparator + group + constant.PathSeparator + key
if listener != nil {
logger.Warnf("etcd metadata report does not support service mapping listener, "+
"mapping changes of %s will not be notified", path)
}
v, err := e.client.Get(path)
if err != nil {
return nil, err
}
appNames := strings.Split(v, constant.CommaSeparator)
set := gxset.NewSet()
for _, app := range appNames {
set.Add(app)
}
return set, nil
return report.DecodeServiceAppNames(v), nil
}

// RemoveServiceAppMappingListener is a no-op: etcd metadata report does not register a mapping
// listener (see GetServiceAppMapping), so there is nothing to remove.
func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group string) error {
return nil
}
Expand Down
Loading
Loading