Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ const (
MetadataServiceURLParamsPropertyName = MetadataServicePrefix + "url-params"
MetadataServiceURLsPropertyName = MetadataServicePrefix + "urls"
ServiceDiscoveryKey = "service_discovery" // indicate which service discovery instance will be used

// metadata GC configuration keys
MetadataGCEnabledKey = "metadata.gc.enabled"
MetadataGCWindowKey = "metadata.gc.window" // GC window in days, aligned with daily renew cycle
MetadataRenewOnStartupKey = "metadata.renew-on-startup" // whether to run renewAppMetadata once on startup
)

// Generic Filter
Expand Down
17 changes: 17 additions & 0 deletions metadata/info/metadata_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type MetadataInfo struct {
Services map[string]*ServiceInfo `json:"services,omitempty" hessian:"services"`
exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls
subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls
LastUpdatedTime int64 `json:"lastUpdatedTime,omitempty" hessian:"-"`
}

func NewAppMetadataInfo(app string) *MetadataInfo {
Expand Down Expand Up @@ -174,6 +175,22 @@ func (info *MetadataInfo) ReplaceExportedServices(urls []*common.URL) {
}
}

// Snapshot creates a deep copy of the MetadataInfo for safe concurrent access.
// The caller can modify the snapshot without affecting the original.
func (info *MetadataInfo) Snapshot() MetadataInfo {
services := make(map[string]*ServiceInfo, len(info.Services))
for k, v := range info.Services {
si := *v
services[k] = &si
}
return MetadataInfo{
App: info.App,
Revision: info.Revision,
Tag: info.Tag,
Services: services,
}
}

func (info *MetadataInfo) findExportedServiceURL(matchKey string) *common.URL {
for _, urls := range info.exportedServiceURLs {
for _, serviceURL := range urls {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/metadata/info"
"dubbo.apache.org/dubbo-go/v3/metadata/mapping"
"dubbo.apache.org/dubbo-go/v3/metadata/report"
Expand Down Expand Up @@ -86,6 +87,11 @@ func (stubReport) GetServiceAppMapping(string, string, mapping.MappingListener)
return nil, nil
}
func (stubReport) RemoveServiceAppMappingListener(string, string) error { return nil }
func (stubReport) UnPublishAppMetadata(string, string) error { return nil }
func (stubReport) ListAppRevisions(string) ([]report.AppRevision, error) {
return nil, nil
}
func (stubReport) URL() *common.URL { return nil }

// casReport registers mappings with optimistic concurrency against a versionedStore, exactly
// as the etcd/zk/nacos reports now do, returning report.ErrMappingCASConflict on conflict.
Expand Down
15 changes: 15 additions & 0 deletions metadata/mapping/metadata/service_name_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,18 @@
args := m.Called()
return args.Error(0)
}

func (m *mockMetadataReport) UnPublishAppMetadata(string, string) error {

Check warning on line 193 in metadata/mapping/metadata/service_name_mapping_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Update this function so that its implementation is not identical to "RemoveServiceAppMappingListener" on line 188.

See more on https://sonarcloud.io/project/issues?id=apache_dubbo-go&issues=AZ6WxS3U8Ft_Yw-VoyI7&open=AZ6WxS3U8Ft_Yw-VoyI7&pullRequest=3371
args := m.Called()
return args.Error(0)
}

func (m *mockMetadataReport) ListAppRevisions(string) ([]report.AppRevision, error) {
args := m.Called()
return args.Get(0).([]report.AppRevision), args.Error(1)
}

func (m *mockMetadataReport) URL() *common.URL {
u, _ := common.NewURL("mock://127.0.0.1:8848")
return u
}
66 changes: 62 additions & 4 deletions metadata/report/etcd/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package etcd

import (
"encoding/json"
"errors"
"fmt"
"strings"
)
Expand All @@ -29,6 +30,8 @@ import (
"github.com/dubbogo/gost/log/logger"

perrors "github.com/pkg/errors"

clientv3 "go.etcd.io/etcd/client/v3"
)

import (
Expand All @@ -40,6 +43,26 @@ import (
"dubbo.apache.org/dubbo-go/v3/metadata/report"
)

// etcdClient abstracts the etcd Client operations used by etcdMetadataReport.
type etcdClient interface {
Get(key string) (string, error)
Put(key, value string) error
Delete(key string) error
GetChildren(key string) ([]string, []string, error)
GetValAndRev(key string) (string, int64, error)
Create(key, value string) error
UpdateWithRev(key, value string, rev int64, opts ...clientv3.OpOption) error
}

// etcdClientWrapper wraps *gxetcd.Client to implement etcdClient.
type etcdClientWrapper struct {
*gxetcd.Client
}

func (w etcdClientWrapper) Put(key, value string) error {
return w.Client.Put(key, value)
}

const DEFAULT_ROOT = "dubbo"

func init() {
Expand All @@ -50,13 +73,19 @@ func init() {

// etcdMetadataReport is the implementation of MetadataReport based etcd
type etcdMetadataReport struct {
client *gxetcd.Client
client etcdClient
rootDir string
url *common.URL
}

// URL returns the URL used to create this metadata report.
func (e *etcdMetadataReport) URL() *common.URL {
return e.url
}

// GetAppMetadata get metadata info from etcd
func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info.MetadataInfo, error) {
key := e.rootDir + application + constant.PathSeparator + revision
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
data, err := e.client.Get(key)
if err != nil {
return nil, err
Expand All @@ -68,7 +97,7 @@ func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info

// PublishAppMetadata publish metadata info to etcd
func (e *etcdMetadataReport) PublishAppMetadata(application, revision string, info *info.MetadataInfo) error {
key := e.rootDir + application + constant.PathSeparator + revision
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
value, err := json.Marshal(info)
if err == nil {
err = e.client.Put(key, string(value))
Expand Down Expand Up @@ -125,6 +154,35 @@ func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group s
return nil
}

// UnPublishAppMetadata removes metadata for a specific revision from etcd.
// This operation is idempotent.
func (e *etcdMetadataReport) UnPublishAppMetadata(application, revision string) error {
key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision
return e.client.Delete(key)
}

func (e *etcdMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) {
prefix := e.rootDir + constant.PathSeparator + application + constant.PathSeparator
keys, values, err := e.client.GetChildren(prefix)
if err != nil {
if errors.Is(perrors.Cause(err), gxetcd.ErrKVPairNotFound) {
return nil, nil
}
return nil, err
}

result := make([]report.AppRevision, 0, len(keys))
for i, key := range keys {
// Extract revision from key suffix (key is full path, revision is last segment)
revision := key[strings.LastIndex(key, constant.PathSeparator)+1:]
result = append(result, report.AppRevision{
Revision: revision,
ModifyTime: report.ParseMetadataLastUpdatedTime([]byte(values[i])),
})
}
return result, nil
}
Comment thread
eye-gu marked this conversation as resolved.

type etcdMetadataReportFactory struct{}

// CreateMetadataReport get the MetadataReport instance of etcd
Expand All @@ -138,5 +196,5 @@ func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report
}
group := url.GetParam(constant.MetadataReportGroupKey, DEFAULT_ROOT)
group = constant.PathSeparator + strings.TrimPrefix(group, constant.PathSeparator)
return &etcdMetadataReport{client: client, rootDir: group}
return &etcdMetadataReport{client: etcdClientWrapper{client}, rootDir: group, url: url}
}
Loading
Loading