diff --git a/common/constant/key.go b/common/constant/key.go index d680cf9862..c2612f2778 100644 --- a/common/constant/key.go +++ b/common/constant/key.go @@ -417,6 +417,11 @@ const ( MetadataServiceURLParamsPropertyName = MetadataServicePrefix + "url-params" MetadataServiceURLsPropertyName = MetadataServicePrefix + "urls" ServiceDiscoveryKey = "service_discovery" // indicate which service discovery instance will be used + + // metadata GC configuration keys + MetadataGCEnabledKey = "metadata.gc.enabled" + MetadataGCWindowKey = "metadata.gc.window" // GC window in days, aligned with daily renew cycle + MetadataRenewOnStartupKey = "metadata.renew-on-startup" // whether to run renewAppMetadata once on startup ) // Generic Filter diff --git a/metadata/info/metadata_info.go b/metadata/info/metadata_info.go index 1dd4ffc108..5ae7bd5d4f 100644 --- a/metadata/info/metadata_info.go +++ b/metadata/info/metadata_info.go @@ -63,6 +63,7 @@ type MetadataInfo struct { Services map[string]*ServiceInfo `json:"services,omitempty" hessian:"services"` exportedServiceURLs map[string][]*common.URL `hessian:"-"` // server exported service urls subscribedServiceURLs map[string][]*common.URL `hessian:"-"` // client subscribed service urls + LastUpdatedTime int64 `json:"lastUpdatedTime,omitempty" hessian:"-"` } func NewAppMetadataInfo(app string) *MetadataInfo { @@ -174,6 +175,22 @@ func (info *MetadataInfo) ReplaceExportedServices(urls []*common.URL) { } } +// Snapshot creates a deep copy of the MetadataInfo for safe concurrent access. +// The caller can modify the snapshot without affecting the original. +func (info *MetadataInfo) Snapshot() MetadataInfo { + services := make(map[string]*ServiceInfo, len(info.Services)) + for k, v := range info.Services { + si := *v + services[k] = &si + } + return MetadataInfo{ + App: info.App, + Revision: info.Revision, + Tag: info.Tag, + Services: services, + } +} + func (info *MetadataInfo) findExportedServiceURL(matchKey string) *common.URL { for _, urls := range info.exportedServiceURLs { for _, serviceURL := range urls { diff --git a/metadata/mapping/metadata/service_name_mapping_concurrency_test.go b/metadata/mapping/metadata/service_name_mapping_concurrency_test.go index 578bc828b6..7d729c3766 100644 --- a/metadata/mapping/metadata/service_name_mapping_concurrency_test.go +++ b/metadata/mapping/metadata/service_name_mapping_concurrency_test.go @@ -30,6 +30,7 @@ import ( ) import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/metadata/info" "dubbo.apache.org/dubbo-go/v3/metadata/mapping" "dubbo.apache.org/dubbo-go/v3/metadata/report" @@ -86,6 +87,11 @@ func (stubReport) GetServiceAppMapping(string, string, mapping.MappingListener) return nil, nil } func (stubReport) RemoveServiceAppMappingListener(string, string) error { return nil } +func (stubReport) UnPublishAppMetadata(string, string) error { return nil } +func (stubReport) ListAppRevisions(string) ([]report.AppRevision, error) { + return nil, nil +} +func (stubReport) URL() *common.URL { return nil } // casReport registers mappings with optimistic concurrency against a versionedStore, exactly // as the etcd/zk/nacos reports now do, returning report.ErrMappingCASConflict on conflict. diff --git a/metadata/mapping/metadata/service_name_mapping_test.go b/metadata/mapping/metadata/service_name_mapping_test.go index c279ff4c07..f461f9f5e9 100644 --- a/metadata/mapping/metadata/service_name_mapping_test.go +++ b/metadata/mapping/metadata/service_name_mapping_test.go @@ -189,3 +189,18 @@ func (m *mockMetadataReport) RemoveServiceAppMappingListener(string, string) err args := m.Called() return args.Error(0) } + +func (m *mockMetadataReport) UnPublishAppMetadata(string, string) error { + args := m.Called() + return args.Error(0) +} + +func (m *mockMetadataReport) ListAppRevisions(string) ([]report.AppRevision, error) { + args := m.Called() + return args.Get(0).([]report.AppRevision), args.Error(1) +} + +func (m *mockMetadataReport) URL() *common.URL { + u, _ := common.NewURL("mock://127.0.0.1:8848") + return u +} diff --git a/metadata/report/etcd/report.go b/metadata/report/etcd/report.go index 182b18898d..2cb344d09f 100644 --- a/metadata/report/etcd/report.go +++ b/metadata/report/etcd/report.go @@ -19,6 +19,7 @@ package etcd import ( "encoding/json" + "errors" "fmt" "strings" ) @@ -29,6 +30,8 @@ import ( "github.com/dubbogo/gost/log/logger" perrors "github.com/pkg/errors" + + clientv3 "go.etcd.io/etcd/client/v3" ) import ( @@ -40,6 +43,26 @@ import ( "dubbo.apache.org/dubbo-go/v3/metadata/report" ) +// etcdClient abstracts the etcd Client operations used by etcdMetadataReport. +type etcdClient interface { + Get(key string) (string, error) + Put(key, value string) error + Delete(key string) error + GetChildren(key string) ([]string, []string, error) + GetValAndRev(key string) (string, int64, error) + Create(key, value string) error + UpdateWithRev(key, value string, rev int64, opts ...clientv3.OpOption) error +} + +// etcdClientWrapper wraps *gxetcd.Client to implement etcdClient. +type etcdClientWrapper struct { + *gxetcd.Client +} + +func (w etcdClientWrapper) Put(key, value string) error { + return w.Client.Put(key, value) +} + const DEFAULT_ROOT = "dubbo" func init() { @@ -50,13 +73,19 @@ func init() { // etcdMetadataReport is the implementation of MetadataReport based etcd type etcdMetadataReport struct { - client *gxetcd.Client + client etcdClient rootDir string + url *common.URL +} + +// URL returns the URL used to create this metadata report. +func (e *etcdMetadataReport) URL() *common.URL { + return e.url } // GetAppMetadata get metadata info from etcd func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info.MetadataInfo, error) { - key := e.rootDir + application + constant.PathSeparator + revision + key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision data, err := e.client.Get(key) if err != nil { return nil, err @@ -68,7 +97,7 @@ func (e *etcdMetadataReport) GetAppMetadata(application, revision string) (*info // PublishAppMetadata publish metadata info to etcd func (e *etcdMetadataReport) PublishAppMetadata(application, revision string, info *info.MetadataInfo) error { - key := e.rootDir + application + constant.PathSeparator + revision + key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision value, err := json.Marshal(info) if err == nil { err = e.client.Put(key, string(value)) @@ -125,6 +154,35 @@ func (e *etcdMetadataReport) RemoveServiceAppMappingListener(key string, group s return nil } +// UnPublishAppMetadata removes metadata for a specific revision from etcd. +// This operation is idempotent. +func (e *etcdMetadataReport) UnPublishAppMetadata(application, revision string) error { + key := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + revision + return e.client.Delete(key) +} + +func (e *etcdMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) { + prefix := e.rootDir + constant.PathSeparator + application + constant.PathSeparator + keys, values, err := e.client.GetChildren(prefix) + if err != nil { + if errors.Is(perrors.Cause(err), gxetcd.ErrKVPairNotFound) { + return nil, nil + } + return nil, err + } + + result := make([]report.AppRevision, 0, len(keys)) + for i, key := range keys { + // Extract revision from key suffix (key is full path, revision is last segment) + revision := key[strings.LastIndex(key, constant.PathSeparator)+1:] + result = append(result, report.AppRevision{ + Revision: revision, + ModifyTime: report.ParseMetadataLastUpdatedTime([]byte(values[i])), + }) + } + return result, nil +} + type etcdMetadataReportFactory struct{} // CreateMetadataReport get the MetadataReport instance of etcd @@ -138,5 +196,5 @@ func (e *etcdMetadataReportFactory) CreateMetadataReport(url *common.URL) report } group := url.GetParam(constant.MetadataReportGroupKey, DEFAULT_ROOT) group = constant.PathSeparator + strings.TrimPrefix(group, constant.PathSeparator) - return &etcdMetadataReport{client: client, rootDir: group} + return &etcdMetadataReport{client: etcdClientWrapper{client}, rootDir: group, url: url} } diff --git a/metadata/report/etcd/report_test.go b/metadata/report/etcd/report_test.go index 6ec86ebb6a..01ab15f95e 100644 --- a/metadata/report/etcd/report_test.go +++ b/metadata/report/etcd/report_test.go @@ -17,349 +17,307 @@ package etcd -/* import ( "encoding/json" - "reflect" - "strconv" + "strings" "testing" ) import ( - "github.com/agiledragon/gomonkey" - gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" - "go.etcd.io/etcd/client/v3" + perrors "github.com/pkg/errors" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + clientv3 "go.etcd.io/etcd/client/v3" ) import ( - "dubbo.apache.org/dubbo-go/v3/common" - "dubbo.apache.org/dubbo-go/v3/common/constant" - "dubbo.apache.org/dubbo-go/v3/metadata/identifier" + "dubbo.apache.org/dubbo-go/v3/metadata/info" ) -func newSubscribeMetadataIdentifier() *identifier.SubscriberMetadataIdentifier { - return &identifier.SubscriberMetadataIdentifier{ - Revision: "subscribe", - BaseApplicationMetadataIdentifier: identifier.BaseApplicationMetadataIdentifier{ - Application: "provider", - }, - } +// --- Mock etcdClient --- +// mockEtcdClient implements etcdClient for testing. +type mockEtcdClient struct { + data map[string]string // key -> value + rev map[string]int64 // key -> revision (monotonic counter) + seq int64 // global revision counter } -func newServiceMetadataIdentifier() *identifier.ServiceMetadataIdentifier { - return &identifier.ServiceMetadataIdentifier{ - Protocol: "nacos", - Revision: "a", - BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ - ServiceInterface: "com.test.MyTest", - Version: "1.0.0", - Group: "test_group", - Side: "service", - }, +func newMockEtcdClient() *mockEtcdClient { + return &mockEtcdClient{ + data: make(map[string]string), + rev: make(map[string]int64), } } -func newMetadataIdentifier(side string) *identifier.MetadataIdentifier { - return &identifier.MetadataIdentifier{ - Application: "test", - BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{ - ServiceInterface: "com.test.MyTest", - Version: "1.0.0", - Group: "test_group", - Side: side, - }, +func (m *mockEtcdClient) Get(key string) (string, error) { + v, ok := m.data[key] + if !ok { + return "", gxetcd.ErrKVPairNotFound } + return v, nil } -type fields struct { - client *gxetcd.Client - root string -} -type args struct { - subscriberMetadataIdentifier *identifier.SubscriberMetadataIdentifier - info *common.MetadataInfo - providerIdentifier *identifier.MetadataIdentifier - serviceDefinitions string - consumerMetadataIdentifier *identifier.MetadataIdentifier - serviceParameterString string - serviceMetadataIdentifier *identifier.ServiceMetadataIdentifier - url *common.URL - urls string +func (m *mockEtcdClient) Put(key, value string) error { + m.data[key] = value + m.seq++ + m.rev[key] = m.seq + return nil } -func newEtcdMetadataReport(f fields) *etcdMetadataReport { - return &etcdMetadataReport{ - client: f.client, - root: f.root, +func (m *mockEtcdClient) Delete(key string) error { + if _, ok := m.data[key]; !ok { + return nil // etcd delete is idempotent } + delete(m.data, key) + return nil } -func Test_etcdMetadataReport_PublishAppMetadata(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Put", func(_ *gxetcd.Client, k, v string, opts ...clientv3.OpOption) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: "/dubbo", - }, - args: args{ - subscriberMetadataIdentifier: newSubscribeMetadataIdentifier(), - info: &common.MetadataInfo{}, - }, - wantErr: false, - }, +func (m *mockEtcdClient) GetChildren(key string) ([]string, []string, error) { + var keys, values []string + for k, v := range m.data { + if strings.HasPrefix(k, key) { + keys = append(keys, k) + values = append(values, v) + } } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.PublishAppMetadata(tt.args.subscriberMetadataIdentifier, tt.args.info); (err != nil) != tt.wantErr { - t.Errorf("PublishAppMetadata() error = %v, wantErr %v", err, tt.wantErr) - } - }) + if len(keys) == 0 { + return nil, nil, gxetcd.ErrKVPairNotFound } + return keys, values, nil } -func Test_etcdMetadataReport_StoreProviderMetadata(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Put", func(_ *gxetcd.Client, k, v string, opts ...clientv3.OpOption) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: "/dubbo", - }, - args: args{ - providerIdentifier: newMetadataIdentifier("provuder"), - serviceDefinitions: "provider", - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.StoreProviderMetadata(tt.args.providerIdentifier, tt.args.serviceDefinitions); (err != nil) != tt.wantErr { - t.Errorf("StoreProviderMetadata() error = %v, wantErr %v", err, tt.wantErr) - } - }) +func (m *mockEtcdClient) GetValAndRev(key string) (string, int64, error) { + v, ok := m.data[key] + if !ok { + return "", 0, gxetcd.ErrKVPairNotFound } + return v, m.rev[key], nil } -func Test_etcdMetadataReport_StoreConsumerMetadata(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Put", func(_ *gxetcd.Client, k, v string, opts ...clientv3.OpOption) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: "/dubbo", - }, - args: args{ - consumerMetadataIdentifier: newMetadataIdentifier("conusmer"), - serviceParameterString: "conusmer", - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.StoreConsumerMetadata(tt.args.consumerMetadataIdentifier, tt.args.serviceParameterString); (err != nil) != tt.wantErr { - t.Errorf("StoreConsumerMetadata() error = %v, wantErr %v", err, tt.wantErr) - } - }) +func (m *mockEtcdClient) Create(key, value string) error { + if _, ok := m.data[key]; ok { + return gxetcd.ErrCompareFail } + m.data[key] = value + m.seq++ + m.rev[key] = m.seq + return nil } -func Test_etcdMetadataReport_SaveServiceMetadata(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Put", func(_ *gxetcd.Client, k, v string, opts ...clientv3.OpOption) error { - return nil - }) - defer patches.Reset() - serviceURL, _ := common.NewURL("registry://localhost:8848", common.WithParamsValue(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))) - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: "/dubbo", - }, - args: args{ - serviceMetadataIdentifier: newServiceMetadataIdentifier(), - url: serviceURL, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.SaveServiceMetadata(tt.args.serviceMetadataIdentifier, tt.args.url); (err != nil) != tt.wantErr { - t.Errorf("SaveServiceMetadata() error = %v, wantErr %v", err, tt.wantErr) - } - }) +func (m *mockEtcdClient) UpdateWithRev(key, value string, rev int64, _ ...clientv3.OpOption) error { + cur := m.rev[key] // zero value (0) if missing + if cur != rev { + return gxetcd.ErrCompareFail } + m.data[key] = value + m.seq++ + m.rev[key] = m.seq + return nil } -func Test_etcdMetadataReport_SaveSubscribedData(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Put", func(_ *gxetcd.Client, k, v string, opts ...clientv3.OpOption) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: "/dubbo", - }, - args: args{ - subscriberMetadataIdentifier: newSubscribeMetadataIdentifier(), - urls: "dubbogo", - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.SaveSubscribedData(tt.args.subscriberMetadataIdentifier, tt.args.urls); (err != nil) != tt.wantErr { - t.Errorf("SaveSubscribedData() error = %v, wantErr %v", err, tt.wantErr) - } - }) +// --- Helper --- + +func newTestReport() (*etcdMetadataReport, *mockEtcdClient) { + mc := newMockEtcdClient() + r := &etcdMetadataReport{ + client: mc, + rootDir: "/dubbo", } + return r, mc } -func Test_etcdMetadataReport_RemoveServiceMetadata(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Delete", func(_ *gxetcd.Client, k string) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: DEFAULT_ROOT, - }, - args: args{ - serviceMetadataIdentifier: newServiceMetadataIdentifier(), - }, - wantErr: false, +// --- Tests --- + +func TestPublishAndGetAppMetadata(t *testing.T) { + r, _ := newTestReport() + + meta := &info.MetadataInfo{ + App: "my-app", + Revision: "r1", + Services: map[string]*info.ServiceInfo{ + "com.example.Foo": {Name: "com.example.Foo", Protocol: "dubbo"}, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - if err := e.RemoveServiceMetadata(tt.args.serviceMetadataIdentifier); (err != nil) != tt.wantErr { - t.Errorf("RemoveServiceMetadata() error = %v, wantErr %v", err, tt.wantErr) - } - }) + + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + got, err := r.GetAppMetadata("my-app", "r1") + require.NoError(t, err) + assert.Equal(t, "my-app", got.App) + assert.Equal(t, "r1", got.Revision) + assert.Contains(t, got.Services, "com.example.Foo") + + // Get non-existent returns error + _, err = r.GetAppMetadata("my-app", "nonexistent") + require.Error(t, err) + assert.True(t, perrors.Is(err, gxetcd.ErrKVPairNotFound)) +} + +func TestPublishAppMetadata_Update(t *testing.T) { + r, _ := newTestReport() + + meta := &info.MetadataInfo{App: "my-app", Revision: "r1"} + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + meta.Revision = "r2" + err = r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + got, err := r.GetAppMetadata("my-app", "r1") + require.NoError(t, err) + assert.Equal(t, "r2", got.Revision) +} + +func TestUnPublishAppMetadata(t *testing.T) { + r, _ := newTestReport() + + meta := &info.MetadataInfo{App: "my-app", Revision: "r1"} + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + err = r.UnPublishAppMetadata("my-app", "r1") + require.NoError(t, err) + + _, err = r.GetAppMetadata("my-app", "r1") + require.Error(t, err) +} + +func TestUnPublishAppMetadata_Idempotent(t *testing.T) { + r, _ := newTestReport() + + // Deleting non-existent key should not error (etcd is idempotent) + err := r.UnPublishAppMetadata("my-app", "nonexistent") + require.NoError(t, err) +} + +func TestListAppRevisions(t *testing.T) { + r, _ := newTestReport() + + // No revisions for unknown app + revisions, err := r.ListAppRevisions("unknown-app") + require.NoError(t, err) + assert.Empty(t, revisions) + + // Publish multiple revisions with explicit lastUpdatedTime values + require.NoError(t, r.PublishAppMetadata("my-app", "r1", &info.MetadataInfo{App: "my-app", Revision: "r1", LastUpdatedTime: 1000})) + require.NoError(t, r.PublishAppMetadata("my-app", "r2", &info.MetadataInfo{App: "my-app", Revision: "r2", LastUpdatedTime: 3000})) + require.NoError(t, r.PublishAppMetadata("my-app", "r3", &info.MetadataInfo{App: "my-app", Revision: "r3", LastUpdatedTime: 2000})) + + revisions, err = r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 3) + + names := make(map[string]int64) + for _, rev := range revisions { + names[rev.Revision] = rev.ModifyTime } + assert.Equal(t, int64(1000), names["r1"]) + assert.Equal(t, int64(3000), names["r2"]) + assert.Equal(t, int64(2000), names["r3"]) } -func Test_etcdMetadataReport_GetAppMetadata(t *testing.T) { - info := &common.MetadataInfo{} - target, _ := json.Marshal(info) - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "Get", func(_ *gxetcd.Client, k string) (string, error) { - return string(target), nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - want *common.MetadataInfo - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - root: DEFAULT_ROOT, - }, - args: args{ - subscriberMetadataIdentifier: newSubscribeMetadataIdentifier(), +func TestListAppRevisions_ReturnsAppRevisionType(t *testing.T) { + r, mc := newTestReport() + + mc.data["/dubbo/my-app/rev-abc"] = `{"app":"my-app","revision":"rev-abc","lastUpdatedTime":5000}` + + revisions, err := r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 1) + assert.Equal(t, "rev-abc", revisions[0].Revision) + assert.Equal(t, int64(5000), revisions[0].ModifyTime) +} + +func TestListAppRevisions_ZeroLastUpdatedTime(t *testing.T) { + // Old data without lastUpdatedTime returns ModifyTime=0; callers guard with > 0. + r, mc := newTestReport() + + mc.data["/dubbo/my-app/old-rev"] = `{"app":"my-app","revision":"old-rev"}` + + revisions, err := r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 1) + assert.Equal(t, "old-rev", revisions[0].Revision) + assert.Equal(t, int64(0), revisions[0].ModifyTime) +} + +func TestRegisterServiceAppMapping_NewKey(t *testing.T) { + r, mc := newTestReport() + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app1") + require.NoError(t, err) + assert.Equal(t, "app1", mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestRegisterServiceAppMapping_Append(t *testing.T) { + r, mc := newTestReport() + + mc.data["/dubbo/mapping/com.example.Foo"] = "app1" + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app2") + require.NoError(t, err) + assert.Equal(t, "app1,app2", mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestRegisterServiceAppMapping_Duplicate(t *testing.T) { + r, mc := newTestReport() + + mc.data["/dubbo/mapping/com.example.Foo"] = "app1,app2" + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app1") + require.NoError(t, err) + assert.Equal(t, "app1,app2", mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestGetServiceAppMapping(t *testing.T) { + r, mc := newTestReport() + + mc.data["/dubbo/mapping/com.example.Foo"] = "app1,app2" + + set, err := r.GetServiceAppMapping("com.example.Foo", "mapping", nil) + require.NoError(t, err) + assert.True(t, set.Contains("app1")) + assert.True(t, set.Contains("app2")) +} + +func TestGetServiceAppMapping_NotFound(t *testing.T) { + r, _ := newTestReport() + + _, err := r.GetServiceAppMapping("com.example.Foo", "mapping", nil) + require.Error(t, err) +} + +func TestRemoveServiceAppMappingListener(t *testing.T) { + r, _ := newTestReport() + err := r.RemoveServiceAppMappingListener("key", "group") + require.NoError(t, err) +} + +// --- Pure logic test --- + +func TestMetadataInfoSerialization(t *testing.T) { + original := &info.MetadataInfo{ + App: "test-app", + Revision: "1.0.0", + Services: map[string]*info.ServiceInfo{ + "com.example.TestService": { + Name: "com.example.TestService", Protocol: "dubbo", }, - want: &common.MetadataInfo{}, - wantErr: false, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := newEtcdMetadataReport(tt.fields) - got, err := e.GetAppMetadata(tt.args.subscriberMetadataIdentifier) - if (err != nil) != tt.wantErr { - t.Errorf("GetAppMetadata() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("GetAppMetadata() got = %v, want %v", got, tt.want) - } - }) - } + + data, err := json.Marshal(original) + require.NoError(t, err) + + var restored info.MetadataInfo + err = json.Unmarshal(data, &restored) + require.NoError(t, err) + assert.Equal(t, original.App, restored.App) + assert.Equal(t, original.Revision, restored.Revision) } -*/ diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go index fa77626ae0..e84590a61e 100644 --- a/metadata/report/nacos/report.go +++ b/metadata/report/nacos/report.go @@ -57,6 +57,12 @@ func init() { type nacosMetadataReport struct { client *nacosClient.NacosConfigClient group string + url *common.URL +} + +// URL returns the URL used to create this metadata report. +func (n *nacosMetadataReport) URL() *common.URL { + return n.url } // GetAppMetadata get metadata info from nacos @@ -237,6 +243,70 @@ func (n *nacosMetadataReport) RemoveServiceAppMappingListener(key string, group return n.removeServiceMappingListener(key, group) } +// UnPublishAppMetadata removes metadata for a specific revision from nacos. +// This operation is idempotent — deleting a non-existent config returns false but no error. +func (n *nacosMetadataReport) UnPublishAppMetadata(application, revision string) error { + // Delete primary config (compatible with java impl) + _, err := n.client.Client().DeleteConfig(vo.ConfigParam{ + DataId: application, + Group: revision, + }) + if err != nil { + return perrors.WithMessage(err, "Could not delete the metadata") + } + // Delete legacy config (compatible with dubbo-go 3.1.x). + if _, err = n.client.Client().DeleteConfig(vo.ConfigParam{ + DataId: application + constant.KeySeparator + revision, + Group: n.group, + }); err != nil { + logger.Warnf("[Metadata][Nacos] could not delete legacy metadata for app=%s rev=%s: %v", + application, revision, err) + } + return nil +} + +// ListAppRevisions lists all stored revisions for an application from nacos. +func (n *nacosMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) { + pageNo, pageSize := 1, 500 + var result []report.AppRevision + for { + configs, err := n.client.Client().SearchConfig(vo.SearchConfigParam{ + Search: "accurate", + DataId: application, + Group: "", + PageNo: pageNo, + PageSize: pageSize, + }) + if err != nil { + return nil, perrors.WithMessage(err, "Could not search configs for ListAppRevisions") + } + if configs == nil || len(configs.PageItems) == 0 { + break + } + for _, item := range configs.PageItems { + if item.Group == "" || item.Group == n.group { + // Skip legacy format entries — they use group=n.group and dataId=app:revision + // We only want the Java-compatible format where group=revision and dataId=application + continue + } + rev := item.Group + var modifyTime int64 + if item.Content != "" { + modifyTime = report.ParseMetadataLastUpdatedTime([]byte(item.Content)) + } + result = append(result, report.AppRevision{ + Revision: rev, + ModifyTime: modifyTime, + }) + } + if pageNo*pageSize >= int(configs.TotalCount) { + break + } + pageNo++ + } + return result, nil +} + type nacosMetadataReportFactory struct{} // CreateMetadataReport creates the nacos-based metadata report implementation. @@ -252,5 +322,5 @@ func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) repor logger.Errorf("[Metadata][Nacos] could not create nacos metadata report, url=%s", url.String()) return nil } - return &nacosMetadataReport{client: client, group: group} + return &nacosMetadataReport{client: client, group: group, url: url} } diff --git a/metadata/report/nacos/report_test.go b/metadata/report/nacos/report_test.go index 108e330bb5..bc35ab3c4c 100644 --- a/metadata/report/nacos/report_test.go +++ b/metadata/report/nacos/report_test.go @@ -19,6 +19,7 @@ package nacos import ( "encoding/json" + "fmt" "reflect" "testing" ) @@ -30,6 +31,9 @@ import ( "github.com/nacos-group/nacos-sdk-go/v2/model" "github.com/nacos-group/nacos-sdk-go/v2/vo" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) import ( @@ -290,3 +294,113 @@ func Test_nacosMetadataReport_RegisterServiceAppMapping(t *testing.T) { }) } } + +func Test_nacosMetadataReport_UnPublishAppMetadata(t *testing.T) { + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + // Expect DeleteConfig for both primary and legacy config + mnc.EXPECT().DeleteConfig(gomock.Any()).Times(2).Return(true, nil) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + err := n.UnPublishAppMetadata("test-app", "abc123") + require.NoError(t, err) +} + +func Test_nacosMetadataReport_UnPublishAppMetadata_Error(t *testing.T) { + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + mnc.EXPECT().DeleteConfig(gomock.Any()).Return(false, fmt.Errorf("delete error")) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + err := n.UnPublishAppMetadata("test-app", "abc123") + require.Error(t, err) +} + +func Test_nacosMetadataReport_ListAppRevisions(t *testing.T) { + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + mnc.EXPECT().SearchConfig(gomock.Any()).Return(&model.ConfigPage{ + TotalCount: 2, + PageItems: []model.ConfigItem{ + {DataId: "test-app", Group: "rev1", Content: `{"app":"test-app","revision":"rev1","lastUpdatedTime":100}`}, + {DataId: "test-app", Group: "rev2", Content: `{"app":"test-app","revision":"rev2","lastUpdatedTime":200}`}, + }, + }, nil) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + revisions, err := n.ListAppRevisions("test-app") + require.NoError(t, err) + assert.Len(t, revisions, 2) + + names := make(map[string]int64) + for _, rev := range revisions { + names[rev.Revision] = rev.ModifyTime + } + assert.Equal(t, int64(100), names["rev1"]) + assert.Equal(t, int64(200), names["rev2"]) +} + +func Test_nacosMetadataReport_ListAppRevisions_Empty(t *testing.T) { + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + mnc.EXPECT().SearchConfig(gomock.Any()).Return(&model.ConfigPage{ + TotalCount: 0, + PageItems: nil, + }, nil) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + revisions, err := n.ListAppRevisions("test-app") + require.NoError(t, err) + assert.Nil(t, revisions) +} + +func Test_nacosMetadataReport_ListAppRevisions_FilterLegacyEntries(t *testing.T) { + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + mnc.EXPECT().SearchConfig(gomock.Any()).Return(&model.ConfigPage{ + TotalCount: 3, + PageItems: []model.ConfigItem{ + {DataId: "test-app", Group: "rev1", Content: `{"app":"test-app","lastUpdatedTime":500}`}, // Java-compatible: keep + {DataId: "test-app:rev2", Group: "dubbo"}, // Legacy: skip (group == n.group) + {DataId: "test-app", Group: ""}, // Empty group: skip + }, + }, nil) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + revisions, err := n.ListAppRevisions("test-app") + require.NoError(t, err) + assert.Len(t, revisions, 1) + assert.Equal(t, "rev1", revisions[0].Revision) + assert.Equal(t, int64(500), revisions[0].ModifyTime) +} + +func Test_nacosMetadataReport_ListAppRevisions_ZeroLastUpdatedTime(t *testing.T) { + // Old data without lastUpdatedTime returns ModifyTime=0; callers guard with > 0. + ctrl := gomock.NewController(t) + mnc := NewMockIConfigClient(ctrl) + mnc.EXPECT().SearchConfig(gomock.Any()).Return(&model.ConfigPage{ + TotalCount: 1, + PageItems: []model.ConfigItem{ + {DataId: "test-app", Group: "old-rev", Content: `{"app":"test-app","revision":"old-rev"}`}, + }, + }, nil) + nc := &nacosClient.NacosConfigClient{} + nc.SetClient(mnc) + + n := &nacosMetadataReport{client: nc, group: "dubbo"} + revisions, err := n.ListAppRevisions("test-app") + require.NoError(t, err) + assert.Len(t, revisions, 1) + assert.Equal(t, "old-rev", revisions[0].Revision) + assert.Equal(t, int64(0), revisions[0].ModifyTime) +} diff --git a/metadata/report/report.go b/metadata/report/report.go index edabce0a60..2334cc10cf 100644 --- a/metadata/report/report.go +++ b/metadata/report/report.go @@ -17,15 +17,35 @@ package report +import ( + "encoding/json" +) + import ( gxset "github.com/dubbogo/gost/container/set" ) import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/metadata/info" "dubbo.apache.org/dubbo-go/v3/metadata/mapping" ) +// AppRevision represents a revision entry with its last modification time. +type AppRevision struct { + Revision string + ModifyTime int64 // unix timestamp in milliseconds +} + +// ParseMetadataLastUpdatedTime extracts lastUpdatedTime from a metadata JSON blob. +func ParseMetadataLastUpdatedTime(data []byte) int64 { + var meta info.MetadataInfo + if err := json.Unmarshal(data, &meta); err != nil { + return 0 + } + return meta.LastUpdatedTime +} + // MetadataReport is an interface of remote metadata report. type MetadataReport interface { // GetAppMetadata get metadata info from report @@ -42,4 +62,15 @@ type MetadataReport interface { // RemoveServiceAppMappingListener remove the serviceMapping listener by key and group RemoveServiceAppMappingListener(interfaceName, group string) error + + // UnPublishAppMetadata removes metadata for a specific revision from the metadata center. + // This operation is idempotent — deleting a non-existent revision should not return an error. + UnPublishAppMetadata(application, revision string) error + + // ListAppRevisions lists all stored revisions for an application. + // Each AppRevision contains the revision string and the last modification time (unix ms). + ListAppRevisions(application string) ([]AppRevision, error) + + // URL returns the URL used to create this metadata report instance. + URL() *common.URL } diff --git a/metadata/report/zookeeper/report.go b/metadata/report/zookeeper/report.go index 5e3b2d2e4e..8e594e3651 100644 --- a/metadata/report/zookeeper/report.go +++ b/metadata/report/zookeeper/report.go @@ -28,7 +28,6 @@ import ( gxset "github.com/dubbogo/gost/container/set" gxzookeeper "github.com/dubbogo/gost/database/kv/zk" - "github.com/dubbogo/gost/log/logger" perrors "github.com/pkg/errors" ) @@ -44,6 +43,35 @@ import ( "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper" ) +// zkClient abstracts the ZookeeperClient operations used by zookeeperMetadataReport. +type zkClient interface { + GetContent(path string) ([]byte, *zk.Stat, error) + SetContent(path string, data []byte, version int32) (*zk.Stat, error) + CreateWithValue(path string, data []byte) error + Delete(path string) error + Exists(path string) (bool, *zk.Stat, error) + Children(path string) ([]string, *zk.Stat, error) + Get(path string) ([]byte, *zk.Stat, error) +} + +// zkClientWrapper wraps *gxzookeeper.ZookeeperClient to implement zkClient. +// Methods that exist on Conn (Exists, Children, Get) are delegated. +type zkClientWrapper struct { + *gxzookeeper.ZookeeperClient +} + +func (w zkClientWrapper) Exists(path string) (bool, *zk.Stat, error) { + return w.Conn.Exists(path) +} + +func (w zkClientWrapper) Children(path string) ([]string, *zk.Stat, error) { + return w.Conn.Children(path) +} + +func (w zkClientWrapper) Get(path string) ([]byte, *zk.Stat, error) { + return w.Conn.Get(path) +} + func init() { mf := &zookeeperMetadataReportFactory{} extension.SetMetadataReportFactory("zookeeper", func() report.MetadataReportFactory { @@ -54,10 +82,16 @@ func init() { // zookeeperMetadataReport is the implementation of // MetadataReport based on zookeeper. type zookeeperMetadataReport struct { - client *gxzookeeper.ZookeeperClient + client zkClient rootDir string listener *zookeeper.ZkEventListener cacheListener *CacheListener + url *common.URL +} + +// URL returns the URL used to create this metadata report. +func (m *zookeeperMetadataReport) URL() *common.URL { + return m.url } // GetAppMetadata get metadata info from zookeeper @@ -84,17 +118,52 @@ func (m *zookeeperMetadataReport) PublishAppMetadata(application, revision strin } err = m.client.CreateWithValue(k, data) if perrors.Is(err, zk.ErrNodeExists) { - logger.Debug("[Metadata][Zookeeper] try to create the node data failed. In most cases, it's not a problem. ") + _, err = m.client.SetContent(k, data, -1) + } + return err +} + +// UnPublishAppMetadata removes metadata for a specific revision from zookeeper. +// This operation is idempotent. +func (m *zookeeperMetadataReport) UnPublishAppMetadata(application, revision string) error { + k := m.rootDir + application + constant.PathSeparator + revision + err := m.client.Delete(k) + if perrors.Is(err, zk.ErrNoNode) { return nil } return err } +// ListAppRevisions lists all stored revisions for an application from zookeeper. +func (m *zookeeperMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) { + parent := m.rootDir + application + children, _, err := m.client.Children(parent) + if err != nil { + if perrors.Is(err, zk.ErrNoNode) { + return nil, nil + } + return nil, err + } + result := make([]report.AppRevision, 0, len(children)) + for _, rev := range children { + path := parent + constant.PathSeparator + rev + data, _, err := m.client.Get(path) + if err != nil { + continue // skip if node disappeared between listing and reading + } + result = append(result, report.AppRevision{ + Revision: rev, + ModifyTime: report.ParseMetadataLastUpdatedTime(data), + }) + } + return result, nil +} + // RegisterServiceAppMapping map the specified Dubbo service interface to current Dubbo app name func (m *zookeeperMetadataReport) RegisterServiceAppMapping(key string, group string, value string) error { path := m.rootDir + group + constant.PathSeparator + key v, state, err := m.client.GetContent(path) - if err == zk.ErrNoNode { + if perrors.Is(err, zk.ErrNoNode) { if cErr := m.client.CreateWithValue(path, []byte(value)); cErr != nil { if perrors.Is(cErr, zk.ErrNodeExists) { return fmt.Errorf("create mapping %s: %w", path, report.ErrMappingCASConflict) @@ -163,9 +232,10 @@ func (mf *zookeeperMetadataReportFactory) CreateMetadataReport(url *common.URL) } reporter := &zookeeperMetadataReport{ - client: client, + client: zkClientWrapper{client}, rootDir: rootDir, listener: zookeeper.NewZkEventListener(client), + url: url, } reporter.cacheListener = NewCacheListener(rootDir, reporter.listener) diff --git a/metadata/report/zookeeper/report_test.go b/metadata/report/zookeeper/report_test.go index d17db0eefc..1dba65d38b 100644 --- a/metadata/report/zookeeper/report_test.go +++ b/metadata/report/zookeeper/report_test.go @@ -19,10 +19,13 @@ package zookeeper import ( "encoding/json" + "strings" "testing" ) import ( + "github.com/dubbogo/go-zookeeper/zk" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -34,6 +37,270 @@ import ( "dubbo.apache.org/dubbo-go/v3/metadata/report" ) +// --- Mock zkClient --- +// mockZkClient implements zkClient for testing. +type mockZkClient struct { + data map[string][]byte // path -> value + stats map[string]*zk.Stat // path -> stat + errors map[string]error // path -> error for specific operations (optional override) +} + +func newMockZkClient() *mockZkClient { + return &mockZkClient{ + data: make(map[string][]byte), + stats: make(map[string]*zk.Stat), + errors: make(map[string]error), + } +} + +func (m *mockZkClient) GetContent(path string) ([]byte, *zk.Stat, error) { + if err, ok := m.errors["GetContent:"+path]; ok { + return nil, nil, err + } + v, ok := m.data[path] + if !ok { + return nil, nil, zk.ErrNoNode + } + return v, m.stats[path], nil +} + +func (m *mockZkClient) SetContent(path string, data []byte, version int32) (*zk.Stat, error) { + if err, ok := m.errors["SetContent:"+path]; ok { + return nil, err + } + m.data[path] = data + stat := &zk.Stat{Version: version + 1, Mtime: int64(version + 1)} + m.stats[path] = stat + return stat, nil +} + +func (m *mockZkClient) CreateWithValue(path string, data []byte) error { + if err, ok := m.errors["CreateWithValue:"+path]; ok { + return err + } + if _, exists := m.data[path]; exists { + return zk.ErrNodeExists + } + m.data[path] = data + stat := &zk.Stat{Version: 0, Mtime: 0} + m.stats[path] = stat + return nil +} + +func (m *mockZkClient) Delete(path string) error { + if err, ok := m.errors["Delete:"+path]; ok { + return err + } + if _, ok := m.data[path]; !ok { + return zk.ErrNoNode + } + delete(m.data, path) + delete(m.stats, path) + return nil +} + +func (m *mockZkClient) Exists(path string) (bool, *zk.Stat, error) { + _, ok := m.data[path] + if !ok { + return false, nil, nil + } + return true, m.stats[path], nil +} + +func (m *mockZkClient) Children(path string) ([]string, *zk.Stat, error) { + if err, ok := m.errors["Children:"+path]; ok { + return nil, nil, err + } + prefix := path + "/" + seen := make(map[string]bool) + for k := range m.data { + if strings.HasPrefix(k, prefix) { + child := k[len(prefix):] + // only direct children + if idx := strings.Index(child, "/"); idx >= 0 { + child = child[:idx] + } + seen[child] = true + } + } + if len(seen) == 0 { + return nil, nil, zk.ErrNoNode + } + result := make([]string, 0, len(seen)) + for k := range seen { + result = append(result, k) + } + return result, &zk.Stat{}, nil +} + +func (m *mockZkClient) Get(path string) ([]byte, *zk.Stat, error) { + v, ok := m.data[path] + if !ok { + return nil, nil, zk.ErrNoNode + } + return v, m.stats[path], nil +} + +// --- Helper --- + +func newTestReportWithMock() (*zookeeperMetadataReport, *mockZkClient) { + mc := newMockZkClient() + r := &zookeeperMetadataReport{ + client: mc, + rootDir: "/dubbo/", + } + return r, mc +} + +// --- Tests --- + +func TestPublishAndGetAppMetadata(t *testing.T) { + r, mc := newTestReportWithMock() + + meta := &info.MetadataInfo{ + App: "my-app", + Revision: "r1", + Services: map[string]*info.ServiceInfo{ + "com.example.Foo": {Name: "com.example.Foo", Protocol: "dubbo"}, + }, + } + + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + // Verify data was stored + data, err := json.Marshal(meta) + require.NoError(t, err) + assert.Equal(t, data, mc.data["/dubbo/my-app/r1"]) + + // Get it back + got, err := r.GetAppMetadata("my-app", "r1") + require.NoError(t, err) + assert.Equal(t, "my-app", got.App) + assert.Equal(t, "r1", got.Revision) + assert.Contains(t, got.Services, "com.example.Foo") + + // Get non-existent returns error + _, err = r.GetAppMetadata("my-app", "nonexistent") + require.Error(t, err) +} + +func TestPublishAppMetadata_Update(t *testing.T) { + r, _ := newTestReportWithMock() + + meta := &info.MetadataInfo{App: "my-app", Revision: "r1"} + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + // Update existing node + meta.Revision = "r2" + err = r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + got, err := r.GetAppMetadata("my-app", "r1") + require.NoError(t, err) + assert.Equal(t, "r2", got.Revision) +} + +func TestUnPublishAppMetadata(t *testing.T) { + r, mc := newTestReportWithMock() + + meta := &info.MetadataInfo{App: "my-app", Revision: "r1"} + err := r.PublishAppMetadata("my-app", "r1", meta) + require.NoError(t, err) + + err = r.UnPublishAppMetadata("my-app", "r1") + require.NoError(t, err) + + // Data should be gone + _, ok := mc.data["/dubbo/my-app/r1"] + assert.False(t, ok) + + // Get should fail + _, err = r.GetAppMetadata("my-app", "r1") + require.Error(t, err) +} + +func TestUnPublishAppMetadata_NonExistent(t *testing.T) { + r, _ := newTestReportWithMock() + + // Deleting a non-existent node should not error (idempotent per interface contract) + err := r.UnPublishAppMetadata("my-app", "nonexistent") + require.NoError(t, err) +} + +func TestListAppRevisions(t *testing.T) { + r, mc := newTestReportWithMock() + + // No revisions for unknown app + revisions, err := r.ListAppRevisions("unknown-app") + require.NoError(t, err) + assert.Empty(t, revisions) + + // Manually set up revisions with different lastUpdatedTime in content + mc.data["/dubbo/my-app/r1"] = []byte(`{"app":"my-app","revision":"r1","lastUpdatedTime":1000}`) + mc.data["/dubbo/my-app/r2"] = []byte(`{"app":"my-app","revision":"r2","lastUpdatedTime":3000}`) + mc.data["/dubbo/my-app/r3"] = []byte(`{"app":"my-app","revision":"r3","lastUpdatedTime":2000}`) + + revisions, err = r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 3) + + names := make(map[string]int64) + for _, rev := range revisions { + names[rev.Revision] = rev.ModifyTime + } + assert.Equal(t, int64(1000), names["r1"]) + assert.Equal(t, int64(3000), names["r2"]) + assert.Equal(t, int64(2000), names["r3"]) +} + +func TestRegisterServiceAppMapping_NewKey(t *testing.T) { + r, mc := newTestReportWithMock() + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app1") + require.NoError(t, err) + assert.Equal(t, []byte("app1"), mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestRegisterServiceAppMapping_Append(t *testing.T) { + r, mc := newTestReportWithMock() + + mc.data["/dubbo/mapping/com.example.Foo"] = []byte("app1") + mc.stats["/dubbo/mapping/com.example.Foo"] = &zk.Stat{Version: 0} + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app2") + require.NoError(t, err) + assert.Equal(t, []byte("app1,app2"), mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestRegisterServiceAppMapping_Duplicate(t *testing.T) { + r, mc := newTestReportWithMock() + + mc.data["/dubbo/mapping/com.example.Foo"] = []byte("app1,app2") + mc.stats["/dubbo/mapping/com.example.Foo"] = &zk.Stat{Version: 0} + + err := r.RegisterServiceAppMapping("com.example.Foo", "mapping", "app1") + require.NoError(t, err) + // Should not change — app1 is already in the value + assert.Equal(t, []byte("app1,app2"), mc.data["/dubbo/mapping/com.example.Foo"]) +} + +func TestGetServiceAppMapping(t *testing.T) { + r, mc := newTestReportWithMock() + r.cacheListener = NewCacheListener("/dubbo/", nil) + + mc.data["/dubbo/mapping/com.example.Foo"] = []byte("app1,app2") + mc.stats["/dubbo/mapping/com.example.Foo"] = &zk.Stat{} + + set, err := r.GetServiceAppMapping("com.example.Foo", "mapping", nil) + require.NoError(t, err) + assert.True(t, set.Contains("app1")) + assert.True(t, set.Contains("app2")) +} + +// --- Existing pure-logic tests --- + func TestMetadataInfoSerialization(t *testing.T) { original := &info.MetadataInfo{ App: "test-app", @@ -107,20 +374,36 @@ func TestCreateMetadataReportURLParsing(t *testing.T) { } func TestRemoveServiceAppMappingListener(t *testing.T) { - report := &zookeeperMetadataReport{ + r := &zookeeperMetadataReport{ rootDir: "/dubbo/", cacheListener: NewCacheListener("/dubbo/", nil), } - err := report.RemoveServiceAppMappingListener("test.service", "mapping") + err := r.RemoveServiceAppMappingListener("test.service", "mapping") require.NoError(t, err) } -func TestCacheListenerIntegrationWithReport(t *testing.T) { - cacheListener := NewCacheListener("/dubbo/", nil) - report := &zookeeperMetadataReport{ - rootDir: "/dubbo/", - cacheListener: cacheListener, - } - assert.NotNil(t, report.cacheListener) - assert.Equal(t, "/dubbo/", report.rootDir) +func TestListAppRevisions_ReturnsAppRevisionType(t *testing.T) { + // Verify the AppRevision struct is populated correctly from lastUpdatedTime in content + mc := newMockZkClient() + mc.data["/dubbo/my-app/rev-abc"] = []byte(`{"app":"my-app","revision":"rev-abc","lastUpdatedTime":5000}`) + + r := &zookeeperMetadataReport{client: mc, rootDir: "/dubbo/"} + revisions, err := r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 1) + assert.Equal(t, "rev-abc", revisions[0].Revision) + assert.Equal(t, int64(5000), revisions[0].ModifyTime) +} + +func TestListAppRevisions_ZeroLastUpdatedTime(t *testing.T) { + // Old data without lastUpdatedTime returns ModifyTime=0; callers guard with > 0. + mc := newMockZkClient() + mc.data["/dubbo/my-app/old-rev"] = []byte(`{"app":"my-app","revision":"old-rev"}`) + + r := &zookeeperMetadataReport{client: mc, rootDir: "/dubbo/"} + revisions, err := r.ListAppRevisions("my-app") + require.NoError(t, err) + require.Len(t, revisions, 1) + assert.Equal(t, "old-rev", revisions[0].Revision) + assert.Equal(t, int64(0), revisions[0].ModifyTime) } diff --git a/metadata/report_instance.go b/metadata/report_instance.go index f1157ca18a..8f84aab90b 100644 --- a/metadata/report_instance.go +++ b/metadata/report_instance.go @@ -90,6 +90,11 @@ type DelegateMetadataReport struct { instance report.MetadataReport } +// URL returns the URL of the underlying metadata report instance. +func (d *DelegateMetadataReport) URL() *common.URL { + return d.instance.URL() +} + // PublishAppMetadata delegate publish metadata info func (d *DelegateMetadataReport) PublishAppMetadata(application, revision string, meta *info.MetadataInfo) error { event := metadataMetrics.NewMetadataMetricTimeEvent(metadataMetrics.MetadataPush) @@ -121,3 +126,13 @@ func (d *DelegateMetadataReport) RegisterServiceAppMapping(interfaceName, group func (d *DelegateMetadataReport) RemoveServiceAppMappingListener(interfaceName, group string) error { return d.instance.RemoveServiceAppMappingListener(interfaceName, group) } + +// UnPublishAppMetadata delegate unpublish metadata info +func (d *DelegateMetadataReport) UnPublishAppMetadata(application, revision string) error { + return d.instance.UnPublishAppMetadata(application, revision) +} + +// ListAppRevisions delegate list app revisions +func (d *DelegateMetadataReport) ListAppRevisions(application string) ([]report.AppRevision, error) { + return d.instance.ListAppRevisions(application) +} diff --git a/metadata/report_instance_test.go b/metadata/report_instance_test.go index 88a631a548..8311f70519 100644 --- a/metadata/report_instance_test.go +++ b/metadata/report_instance_test.go @@ -169,6 +169,43 @@ func TestDelegateMetadataReportRemoveServiceAppMappingListener(t *testing.T) { }) } +func TestDelegateMetadataReportUnPublishAppMetadata(t *testing.T) { + mockReport := new(mockMetadataReport) + defer mockReport.AssertExpectations(t) + delegate := &DelegateMetadataReport{instance: mockReport} + t.Run("normal", func(t *testing.T) { + mockReport.On("UnPublishAppMetadata").Return(nil).Once() + err := delegate.UnPublishAppMetadata("application", "revision") + require.NoError(t, err) + }) + t.Run("error", func(t *testing.T) { + mockReport.On("UnPublishAppMetadata").Return(errors.New("mock error")).Once() + err := delegate.UnPublishAppMetadata("application", "revision") + require.Error(t, err) + }) +} + +func TestDelegateMetadataReportListAppRevisions(t *testing.T) { + mockReport := new(mockMetadataReport) + defer mockReport.AssertExpectations(t) + delegate := &DelegateMetadataReport{instance: mockReport} + t.Run("normal", func(t *testing.T) { + expected := []report.AppRevision{ + {Revision: "rev1", ModifyTime: 3000}, + {Revision: "rev2", ModifyTime: 1000}, + } + mockReport.On("ListAppRevisions").Return(expected, nil).Once() + got, err := delegate.ListAppRevisions("application") + require.NoError(t, err) + assert.Equal(t, expected, got) + }) + t.Run("error", func(t *testing.T) { + mockReport.On("ListAppRevisions").Return([]report.AppRevision(nil), errors.New("mock error")).Once() + _, err := delegate.ListAppRevisions("application") + require.Error(t, err) + }) +} + func TestGetMetadataReport(t *testing.T) { instances = make(map[string]report.MetadataReport) assert.Nil(t, GetMetadataReport()) @@ -251,6 +288,21 @@ func (m *mockMetadataReport) RemoveServiceAppMappingListener(string, string) err return args.Error(0) } +func (m *mockMetadataReport) UnPublishAppMetadata(string, string) error { + args := m.Called() + return args.Error(0) +} + +func (m *mockMetadataReport) ListAppRevisions(string) ([]report.AppRevision, error) { + args := m.Called() + return args.Get(0).([]report.AppRevision), args.Error(1) +} + +func (m *mockMetadataReport) URL() *common.URL { + u, _ := common.NewURL("mock://127.0.0.1:8848") + return u +} + type listener struct { } diff --git a/registry/servicediscovery/service_discovery_registry.go b/registry/servicediscovery/service_discovery_registry.go index 6d07dd318e..cf6f8a990b 100644 --- a/registry/servicediscovery/service_discovery_registry.go +++ b/registry/servicediscovery/service_discovery_registry.go @@ -19,6 +19,7 @@ package servicediscovery import ( "errors" + "math/rand/v2" "strconv" "strings" "sync" @@ -66,6 +67,7 @@ type serviceDiscoveryRegistry struct { metadataReport report.MetadataReport serviceListeners map[string]registry.ServiceInstancesChangedListener serviceMappingListeners map[string]mapping.MappingListener + renewAppMetadataTimer *time.Timer } func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { @@ -85,6 +87,18 @@ func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) { }, nil } +// startMetadataTimers starts the renewAppMetadata timer if metadata type is remote. +// GC runs after each renew cycle inside doRenewAppMetadata. +func (s *serviceDiscoveryRegistry) startMetadataTimers() { + if metadata.GetMetadataType() != constant.RemoteMetadataStorageType { + return + } + if s.metadataReport == nil { + return + } + s.startRenewAppMetadataTimer() +} + func (s *serviceDiscoveryRegistry) RegisterService() error { metaInfo := metadata.GetMetadataInfo(s.url.GetParam(constant.RegistryIdKey, "")) if metaInfo == nil { @@ -112,6 +126,13 @@ func (s *serviceDiscoveryRegistry) RegisterService() error { s.instanceURLs[instance] = url s.lock.Unlock() } + + s.lock.Lock() + if s.renewAppMetadataTimer == nil { + s.startMetadataTimers() + } + s.lock.Unlock() + return nil } @@ -298,12 +319,162 @@ func (s *serviceDiscoveryRegistry) IsAvailable() bool { } func (s *serviceDiscoveryRegistry) Destroy() { + s.stopMetadataTimers() err := s.serviceDiscovery.Destroy() if err != nil { logger.Errorf("[Registry][ServiceDiscovery] destroy serviceDiscovery catch error, err=%s", err.Error()) } } +func (s *serviceDiscoveryRegistry) stopMetadataTimers() { + s.lock.Lock() + defer s.lock.Unlock() + if s.renewAppMetadataTimer != nil { + s.renewAppMetadataTimer.Stop() + s.renewAppMetadataTimer = nil + } +} + +// ========== renewAppMetadata: daily app-level metadata re-publish ========== + +// metadataReportURL returns the URL from the metadata report instance. +func (s *serviceDiscoveryRegistry) metadataReportURL() *common.URL { + if s.metadataReport == nil { + return nil + } + return s.metadataReport.URL() +} + +func (s *serviceDiscoveryRegistry) startRenewAppMetadataTimer() { + reportURL := s.metadataReportURL() + if reportURL == nil || !reportURL.GetParamBool(constant.CycleReportKey, true) { + return + } + + // Run immediately on start + if reportURL.GetParamBool(constant.MetadataRenewOnStartupKey, true) { + go s.doRenewAppMetadata() + } + + delay := s.calculateRenewAppMetadataDelay() + s.renewAppMetadataTimer = time.AfterFunc(delay, func() { + s.doRenewAppMetadata() + // Reschedule for next day + s.lock.Lock() + if s.renewAppMetadataTimer != nil { + s.renewAppMetadataTimer.Reset(24 * time.Hour) + } + s.lock.Unlock() + }) +} + +func (s *serviceDiscoveryRegistry) doRenewAppMetadata() { + registryID := s.url.GetParam(constant.RegistryIdKey, "") + metaInfo := metadata.GetMetadataInfo(registryID) + if metaInfo == nil || metaInfo.Revision == "0" { + return + } + + // Copy snapshot to avoid data race + snapshot := metaInfo.Snapshot() + snapshot.LastUpdatedTime = time.Now().UnixMilli() + if err := s.metadataReport.PublishAppMetadata(snapshot.App, snapshot.Revision, &snapshot); err != nil { + logger.Errorf("[Metadata][renewAppMetadata] failed to re-publish metadata for app=%s revision=%s: %v", snapshot.App, snapshot.Revision, err) + } else { + logger.Infof("[Metadata][renewAppMetadata] refreshed metadata for app=%s revision=%s", snapshot.App, snapshot.Revision) + } + + // Run garbage collection if enabled, after each renew cycle + reportURL := s.metadataReportURL() + if reportURL != nil && reportURL.GetParamBool(constant.MetadataGCEnabledKey, true) { + s.doGarbageCollect() + } +} + +func (s *serviceDiscoveryRegistry) calculateRenewAppMetadataDelay() time.Duration { + now := time.Now() + // Next day 2:00 AM + nextDay2AM := time.Date(now.Year(), now.Month(), now.Day()+1, 2, 0, 0, 0, now.Location()) + // Add random offset 0~4 hours to avoid thundering herd + randomOffset := time.Duration(rand.Int64N(int64(4 * time.Hour))) + return time.Until(nextDay2AM) + randomOffset +} + +// ========== GC: stale revision cleanup ========== + +func (s *serviceDiscoveryRegistry) doGarbageCollect() { + registryID := s.url.GetParam(constant.RegistryIdKey, "") + metaInfo := metadata.GetMetadataInfo(registryID) + if metaInfo == nil { + return + } + app := metaInfo.App + if app == "" { + return + } + + // Step 1: List all revisions for this app + revisions, err := s.metadataReport.ListAppRevisions(app) + if err != nil { + logger.Warnf("[Metadata][GC] failed to list app revisions: %v", err) + return + } + if len(revisions) == 0 { + return + } + + // Step 2: Filter stale candidates (exceed GC window in days) + reportURL := s.metadataReportURL() + if reportURL == nil { + return + } + gcWindowDays := reportURL.GetParamByIntValue(constant.MetadataGCWindowKey, 5) + if gcWindowDays <= 0 || gcWindowDays > 365 { + gcWindowDays = 5 + } + cutoff := time.Now().AddDate(0, 0, -gcWindowDays).UnixMilli() + candidates := make(map[string]bool) + for _, rev := range revisions { + // Skip special revisions + if rev.Revision == "0" || rev.Revision == "N/A" || rev.Revision == "" || rev.Revision == metaInfo.Revision { + continue + } + // ModifyTime == 0 means metadata produced by versions that did not set lastUpdatedTime. + // Since we can't reliably determine staleness for those entries, skip GC for them. + if rev.ModifyTime > 0 && rev.ModifyTime < cutoff { + candidates[rev.Revision] = true + } + } + if len(candidates) == 0 { + return + } + + // Step 3: Get alive instances and their revisions + instances := s.serviceDiscovery.GetInstances(app) + aliveRevisions := make(map[string]bool) + for _, inst := range instances { + metadata := inst.GetMetadata() + if metadata == nil { + continue + } + rev := metadata[constant.ExportedServicesRevisionPropertyName] + if rev != "" { + aliveRevisions[rev] = true + } + } + + // Step 4: Clean up stale revisions not referenced by any alive instance + for rev := range candidates { + if aliveRevisions[rev] { + continue // still referenced, skip + } + logger.Infof("[Metadata][GC] cleaning up stale revision: app=%s revision=%s", app, rev) + if err := s.metadataReport.UnPublishAppMetadata(app, rev); err != nil { + logger.Warnf("[Metadata][GC] failed to unpublish revision %s: %v", rev, err) + } + } +} + func (s *serviceDiscoveryRegistry) Register(url *common.URL) error { if !shouldRegister(url) { return nil diff --git a/registry/servicediscovery/service_discovery_registry_test.go b/registry/servicediscovery/service_discovery_registry_test.go index 46f2957057..4f9ea9f5b7 100644 --- a/registry/servicediscovery/service_discovery_registry_test.go +++ b/registry/servicediscovery/service_discovery_registry_test.go @@ -40,7 +40,9 @@ import ( "dubbo.apache.org/dubbo-go/v3/common/extension" "dubbo.apache.org/dubbo-go/v3/global" "dubbo.apache.org/dubbo-go/v3/metadata" + "dubbo.apache.org/dubbo-go/v3/metadata/info" "dubbo.apache.org/dubbo-go/v3/metadata/mapping" + "dubbo.apache.org/dubbo-go/v3/metadata/report" "dubbo.apache.org/dubbo-go/v3/protocol" "dubbo.apache.org/dubbo-go/v3/protocol/base" "dubbo.apache.org/dubbo-go/v3/proxy" @@ -505,6 +507,345 @@ func (m *mockProxyFactory) GetAsyncProxy(invoker base.Invoker, callBack any, url func (m *mockProxyFactory) GetInvoker(url *common.URL) protocol.Invoker { return &mockInvoker{} } +// ========== Metadata Lifecycle Tests ========== + +// mockMetadataReportForGC is a lightweight mock for testing GC logic +type mockMetadataReportForGC struct { + revisions []report.AppRevision + deleted []string // tracks deleted revisions + published int // tracks publish calls + reportURL *common.URL +} + +func (m *mockMetadataReportForGC) GetAppMetadata(string, string) (*info.MetadataInfo, error) { + return nil, nil +} +func (m *mockMetadataReportForGC) PublishAppMetadata(string, string, *info.MetadataInfo) error { + m.published++ + return nil +} +func (m *mockMetadataReportForGC) RegisterServiceAppMapping(string, string, string) error { + return nil +} +func (m *mockMetadataReportForGC) GetServiceAppMapping(string, string, mapping.MappingListener) (*gxset.HashSet, error) { + return gxset.NewSet(), nil +} +func (m *mockMetadataReportForGC) RemoveServiceAppMappingListener(string, string) error { + return nil +} +func (m *mockMetadataReportForGC) UnPublishAppMetadata(application, revision string) error { + m.deleted = append(m.deleted, revision) + return nil +} +func (m *mockMetadataReportForGC) ListAppRevisions(application string) ([]report.AppRevision, error) { + return m.revisions, nil +} +func (m *mockMetadataReportForGC) URL() *common.URL { + if m.reportURL != nil { + return m.reportURL + } + u, _ := common.NewURL("mock://127.0.0.1:8848") + return u +} + +// mockServiceDiscoveryWithInstances returns configurable instances for GC tests +type mockServiceDiscoveryWithInstances struct { + *mockServiceDiscovery + instances []registry.ServiceInstance +} + +func (m *mockServiceDiscoveryWithInstances) GetInstances(name string) []registry.ServiceInstance { + return m.instances +} + +func TestServiceDiscoveryRegistry_DoGarbageCollect_CleansStaleRevisions(t *testing.T) { + mockReport := &mockMetadataReportForGC{ + revisions: []report.AppRevision{ + {Revision: "stale-rev1", ModifyTime: time.Now().Add(-96 * time.Hour).UnixMilli()}, // 4 days old + {Revision: "stale-rev2", ModifyTime: time.Now().Add(-48 * time.Hour).UnixMilli()}, // 2 days old, but < 72h window + {Revision: "fresh-rev", ModifyTime: time.Now().UnixMilli()}, // current + }, + reportURL: common.NewURLWithOptions( + common.WithParamsValue(constant.MetadataGCWindowKey, "3"), + ), + } + + sd := &mockServiceDiscoveryWithInstances{ + mockServiceDiscovery: &mockServiceDiscovery{}, + instances: []registry.ServiceInstance{ + ®istry.DefaultServiceInstance{ + Metadata: map[string]string{ + constant.ExportedServicesRevisionPropertyName: "fresh-rev", + }, + }, + }, + } + + regID := fmt.Sprintf("gc-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + metadataReport: mockReport, + } + + // Set up metadata info so doGarbageCollect can find the app name + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.GCStale", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + metadata.AddService(regID, serviceURL) + + reg.doGarbageCollect() + + // Only stale-rev1 should be deleted (stale-rev2 is within 72h window, fresh-rev is alive) + assert.Equal(t, []string{"stale-rev1"}, mockReport.deleted) +} + +func TestServiceDiscoveryRegistry_DoGarbageCollect_SkipsWhenNoStaleRevisions(t *testing.T) { + mockReport := &mockMetadataReportForGC{ + revisions: []report.AppRevision{ + {Revision: "fresh-rev", ModifyTime: time.Now().UnixMilli()}, + }, + reportURL: common.NewURLWithOptions( + common.WithParamsValue(constant.MetadataGCWindowKey, "3"), + ), + } + + sd := &mockServiceDiscoveryWithInstances{ + mockServiceDiscovery: &mockServiceDiscovery{}, + instances: nil, + } + + regID := fmt.Sprintf("gc-nostale-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + metadataReport: mockReport, + } + + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.GCNoStale", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + metadata.AddService(regID, serviceURL) + + reg.doGarbageCollect() + + assert.Empty(t, mockReport.deleted) +} + +func TestServiceDiscoveryRegistry_DoGarbageCollect_SkipsReferencedStaleRevision(t *testing.T) { + staleTime := time.Now().Add(-96 * time.Hour).UnixMilli() + mockReport := &mockMetadataReportForGC{ + revisions: []report.AppRevision{ + {Revision: "stale-but-alive", ModifyTime: staleTime}, + }, + reportURL: common.NewURLWithOptions( + common.WithParamsValue(constant.MetadataGCWindowKey, "3"), + ), + } + + sd := &mockServiceDiscoveryWithInstances{ + mockServiceDiscovery: &mockServiceDiscovery{}, + instances: []registry.ServiceInstance{ + ®istry.DefaultServiceInstance{ + Metadata: map[string]string{ + constant.ExportedServicesRevisionPropertyName: "stale-but-alive", + }, + }, + }, + } + + regID := fmt.Sprintf("gc-ref-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + metadataReport: mockReport, + } + + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.GCRefStale", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + metadata.AddService(regID, serviceURL) + + reg.doGarbageCollect() + + // stale-but-alive is referenced by a live instance, should NOT be deleted + assert.Empty(t, mockReport.deleted) +} + +func TestServiceDiscoveryRegistry_DoRenewAppMetadata(t *testing.T) { + mockReport := &mockMetadataReportForGC{} + + regID := fmt.Sprintf("renew-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + metadataReport: mockReport, + } + + // Set up metadata info via AddService (which populates registryMetadataInfo) + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.RenewAppMetadata", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + common.WithParamsValue(constant.SideKey, constant.SideProvider), + ) + metadata.AddService(regID, serviceURL) + metaInfo := metadata.GetMetadataInfo(regID) + require.NotNil(t, metaInfo) + metaInfo.Revision = "abc123" + + reg.doRenewAppMetadata() + assert.Equal(t, 1, mockReport.published) +} + +func TestServiceDiscoveryRegistry_Destroy_StopsTimers(t *testing.T) { + url := common.NewURLWithOptions() + sd := &mockServiceDiscovery{} + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + } + + // Simulate running timer + reg.renewAppMetadataTimer = time.AfterFunc(1*time.Hour, func() {}) + + reg.Destroy() + + assert.Nil(t, reg.renewAppMetadataTimer) +} + +func TestServiceDiscoveryRegistry_CalculateRenewAppMetadataDelay(t *testing.T) { + url := common.NewURLWithOptions() + reg := &serviceDiscoveryRegistry{url: url} + + delay := reg.calculateRenewAppMetadataDelay() + + // Delay should be between ~14h and ~28h (next day 2AM + 0~4h random offset) + // Minimum: if now is 23:59, next 2AM is ~2h + 0 = 2h + // Maximum: if now is 00:01, next 2AM is ~26h + 4h = 30h + assert.GreaterOrEqual(t, delay, 1*time.Hour, "delay should be at least 1 hour") + assert.LessOrEqual(t, delay, 32*time.Hour, "delay should be at most 32 hours") +} + +// TestServiceDiscoveryRegistry_DoGarbageCollect_SkipsSpecialRevisions verifies that +// special revisions ("0", "N/A", "") and entries with ModifyTime==0 are never GC'd, +// while genuinely stale revisions are cleaned up. +func TestServiceDiscoveryRegistry_DoGarbageCollect_SkipsSpecialRevisions(t *testing.T) { + staleTime := time.Now().Add(-240 * time.Hour).UnixMilli() // 10 days old + mockReport := &mockMetadataReportForGC{ + revisions: []report.AppRevision{ + {Revision: "0", ModifyTime: staleTime}, // special — skip + {Revision: "N/A", ModifyTime: staleTime}, // special — skip + {Revision: "", ModifyTime: staleTime}, // special — skip + {Revision: "no-timestamp", ModifyTime: 0}, // ModifyTime==0 — skip + {Revision: "genuinely-stale", ModifyTime: staleTime}, // stale, no alive ref — delete + }, + reportURL: common.NewURLWithOptions( + common.WithParamsValue(constant.MetadataGCWindowKey, "5"), + ), + } + + sd := &mockServiceDiscoveryWithInstances{ + mockServiceDiscovery: &mockServiceDiscovery{}, + instances: nil, // no alive instances + } + + regID := fmt.Sprintf("gc-special-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + metadataReport: mockReport, + } + + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.GCSpecial", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + metadata.AddService(regID, serviceURL) + + reg.doGarbageCollect() + + // Only "genuinely-stale" should be deleted; all special/zero-timestamp revisions skipped + assert.Equal(t, []string{"genuinely-stale"}, mockReport.deleted) +} + +// TestServiceDiscoveryRegistry_DoGarbageCollect_MixedAliveAndStale verifies the core +// GC branch: stale revisions with no alive reference are cleaned, while stale revisions +// still referenced by alive instances are preserved. +func TestServiceDiscoveryRegistry_DoGarbageCollect_MixedAliveAndStale(t *testing.T) { + staleTime := time.Now().Add(-240 * time.Hour).UnixMilli() // 10 days old, well beyond 5-day window + + mockReport := &mockMetadataReportForGC{ + revisions: []report.AppRevision{ + {Revision: "stale-unreferenced", ModifyTime: staleTime}, // stale, no ref — delete + {Revision: "stale-but-referenced", ModifyTime: staleTime}, // stale, but alive ref — keep + {Revision: "fresh-rev", ModifyTime: time.Now().UnixMilli()}, // fresh — keep + }, + reportURL: common.NewURLWithOptions( + common.WithParamsValue(constant.MetadataGCWindowKey, "5"), + ), + } + + sd := &mockServiceDiscoveryWithInstances{ + mockServiceDiscovery: &mockServiceDiscovery{}, + instances: []registry.ServiceInstance{ + ®istry.DefaultServiceInstance{ + Metadata: map[string]string{ + constant.ExportedServicesRevisionPropertyName: "stale-but-referenced", + }, + }, + ®istry.DefaultServiceInstance{ + Metadata: map[string]string{ + constant.ExportedServicesRevisionPropertyName: "fresh-rev", + }, + }, + }, + } + + regID := fmt.Sprintf("gc-mixed-reg-%d", time.Now().UnixNano()) + url := common.NewURLWithOptions( + common.WithParamsValue(constant.RegistryIdKey, regID), + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + + reg := &serviceDiscoveryRegistry{ + url: url, + serviceDiscovery: sd, + metadataReport: mockReport, + } + + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20880/org.test.GCMixed", + common.WithParamsValue(constant.ApplicationKey, "test-app"), + ) + metadata.AddService(regID, serviceURL) + + reg.doGarbageCollect() + + // Only "stale-unreferenced" should be deleted + assert.Equal(t, []string{"stale-unreferenced"}, mockReport.deleted) +} + type mockInvoker struct{} func (m *mockInvoker) GetURL() *common.URL { return nil }