Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dm-worker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRunMain(_ *testing.T) {
var (
args []string
exit = make(chan int)
waitCh = make(chan interface{}, 1)
waitCh = make(chan any, 1)
)
for _, arg := range os.Args {
switch {
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *dbConn) execSQLs(ctx context.Context, queries ...string) error {
}

_, _, err := c.baseConn.ApplyRetryStrategy(tcontext.NewContext(ctx, log.L()), params,
func(tctx *tcontext.Context) (interface{}, error) {
func(tctx *tcontext.Context) (any, error) {
ret, err2 := c.baseConn.ExecuteSQLWithIgnoreError(tctx, nil, "chaos-cases", ignoreExecSQLError, queries)
return ret, err2
})
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// diffDataLoop checks whether target has the same data with source via `sync-diff-inspector` multiple times.
func diffDataLoop(ctx context.Context, count int, interval time.Duration, schema string, tables []string, targetDB *sql.DB, sourceDBs ...*sql.DB) (err error) {
for i := 0; i < count; i++ {
for i := range count {
select {
case <-ctx.Done():
return nil
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (g *CaseGenerator) genSQLs(ctx context.Context) {
for _, table := range g.tables {
rand.Shuffle(len(g.testCases), func(i, j int) { g.testCases[i], g.testCases[j] = g.testCases[j], g.testCases[i] })
casesNum := rand.Intn(len(g.testCases) + 1)
for i := 0; i < casesNum; i++ {
for i := range casesNum {
for _, sqls := range g.testCases[i] {
fullSqls := make(SQLs, len(sqls))
copy(fullSqls, sqls)
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// checkMembersReadyLoop checks whether all DM-master and DM-worker members have been ready.
// NOTE: in this chaos case, we ensure 3 DM-master and 3 DM-worker started.
func checkMembersReadyLoop(ctx context.Context, cli pb.MasterClient, masterCount, workerCount int) (err error) {
for i := 0; i < checkMemberTimes; i++ {
for range checkMemberTimes {
select {
case <-ctx.Done():
return nil
Expand Down
4 changes: 2 additions & 2 deletions dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (t *task) genFullData() error {
)

// generate `CREATE TABLE` statements.
for i := 0; i < tableCount; i++ {
for range tableCount {
query, name, err := t.ss[0].CreateTableStmt()
if err != nil {
return err
Expand Down Expand Up @@ -258,7 +258,7 @@ func (t *task) genFullData() error {
for _, conn := range t.sourceConns {
conn2 := conn
eg.Go(func() error {
for i := 0; i < fullInsertCount; i++ {
for range fullInsertCount {
query, _, err2 := t.ss[0].InsertStmt(false)
if err2 != nil {
return err2
Expand Down
10 changes: 5 additions & 5 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
pool := checker.NewWorkerPoolWithContext[job, int64](ctx, func(result int64) {
info.totalDataSize.Add(result)
})
for i := 0; i < concurrency; i++ {
for range concurrency {
pool.Go(func(ctx context.Context, job job) (int64, error) {
return conn.FetchTableEstimatedBytes(
ctx,
Expand Down Expand Up @@ -644,7 +644,7 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
if result.Summary.Successful != result.Summary.Total {
rawResult, err = json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
rawResult = fmt.Appendf(nil, "marshal error %v", err)
}
}
c.result.Lock()
Expand Down Expand Up @@ -830,14 +830,14 @@ func (c *Checker) IsFreshTask() (bool, error) {
}

// Status implements Unit interface.
func (c *Checker) Status(_ *binlog.SourceStatus) interface{} {
func (c *Checker) Status(_ *binlog.SourceStatus) any {
c.result.RLock()
res := c.result.detail
c.result.RUnlock()

rawResult, err := json.Marshal(res)
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal %+v error %v", res, err))
rawResult = fmt.Appendf(nil, "marshal %+v error %v", res, err)
}

return &pb.CheckStatus{
Expand All @@ -851,7 +851,7 @@ func (c *Checker) Status(_ *binlog.SourceStatus) interface{} {
}

// Error implements Unit interface.
func (c *Checker) Error() interface{} {
func (c *Checker) Error() any {
return &pb.CheckError{}
}

Expand Down
5 changes: 2 additions & 3 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"bytes"
"fmt"
"maps"

"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -130,9 +131,7 @@ func SupportCheckingItems() string {
// FilterCheckingItems filters ignored items from all checking items.
func FilterCheckingItems(ignoredItems []string) map[string]string {
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
maps.Copy(checkingItems, AllCheckingItems)
Comment thread
dveeden marked this conversation as resolved.
Outdated
delete(checkingItems, AllChecking)

for _, item := range ignoredItems {
Expand Down
5 changes: 2 additions & 3 deletions dm/config/checking_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package config

import (
"maps"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -45,9 +46,7 @@ func TestCheckingItems(t *testing.T) {

// ignore shard checking items
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
maps.Copy(checkingItems, AllCheckingItems)
Comment thread
dveeden marked this conversation as resolved.
Outdated
delete(checkingItems, AllChecking)

require.Equal(t, checkingItems, FilterCheckingItems(ignoredCheckingItems[:0]))
Expand Down
5 changes: 2 additions & 3 deletions dm/config/dbconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dbconfig
import (
"bytes"
"encoding/json"
"maps"
"strings"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -150,9 +151,7 @@ func (db *DBConfig) Clone() *DBConfig {

if db.Session != nil {
clone.Session = make(map[string]string, len(db.Session))
for k, v := range db.Session {
clone.Session[k] = v
}
maps.Copy(clone.Session, db.Session)
Comment thread
dveeden marked this conversation as resolved.
Outdated
}

clone.Security = db.Security.Clone()
Expand Down
2 changes: 1 addition & 1 deletion dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *testTLSConfig) TestClone() {
}
// When add new fields, also update this value
// TODO: check it
c.Require().Equal(10, reflect.TypeOf(*s).NumField())
c.Require().Equal(10, reflect.TypeFor[security.Security]().NumField())
clone := s.Clone()
c.Require().Equal(s, clone)
clone.CertAllowedCN[0] = "g"
Expand Down
34 changes: 17 additions & 17 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type SourceConfig struct {
ServerID uint32 `yaml:"server-id" toml:"server-id" json:"server-id"`

// deprecated tracer, to keep compatibility with older version
Tracer map[string]interface{} `yaml:"tracer" toml:"tracer" json:"-"`
Tracer map[string]any `yaml:"tracer" toml:"tracer" json:"-"`

CaseSensitive bool `yaml:"case-sensitive" toml:"case-sensitive" json:"case-sensitive"`
Filters []*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
Expand Down Expand Up @@ -366,7 +366,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *conn.BaseDB) erro
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
for range 5 {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
Expand Down Expand Up @@ -403,21 +403,21 @@ func (c *SourceConfig) YamlForDowngrade() (string, error) {
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type SourceConfigForDowngrade struct {
Enable bool `yaml:"enable,omitempty"`
EnableGTID bool `yaml:"enable-gtid"`
RelayDir string `yaml:"relay-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
SourceID string `yaml:"source-id"`
From dbconfig.DBConfig `yaml:"from"`
Purge PurgeConfig `yaml:"purge"`
Checker CheckerConfig `yaml:"checker"`
ServerID uint32 `yaml:"server-id"`
Tracer map[string]interface{} `yaml:"tracer"`
Enable bool `yaml:"enable,omitempty"`
EnableGTID bool `yaml:"enable-gtid"`
RelayDir string `yaml:"relay-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
SourceID string `yaml:"source-id"`
From dbconfig.DBConfig `yaml:"from"`
Purge PurgeConfig `yaml:"purge"`
Checker CheckerConfig `yaml:"checker"`
ServerID uint32 `yaml:"server-id"`
Tracer map[string]any `yaml:"tracer"`
// any new config item, we mark it omitempty
CaseSensitive bool `yaml:"case-sensitive,omitempty"`
Filters []*bf.BinlogEventRule `yaml:"filters,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestConfigFunctions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint32(100), cfg1.ServerID)
cfg.Filters = []*bf.BinlogEventRule{}
cfg.Tracer = map[string]interface{}{}
cfg.Tracer = map[string]any{}

var cfg2 SourceConfig
require.NoError(t, cfg2.FromToml(originCfgStr))
Expand Down
24 changes: 8 additions & 16 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,11 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for range 10 {
wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
data, err := json.Marshal(cfg)
require.NoError(t, err)
jsonMap := make(map[string]interface{})
jsonMap := make(map[string]any)
err = json.Unmarshal(data, &jsonMap)
require.NoError(t, err)

Expand All @@ -487,12 +484,9 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
_, hasDumpUUID := jsonMap["dump-uuid"]
require.False(t, hasUUID, "UUID should not be in JSON")
require.False(t, hasDumpUUID, "DumpUUID should not be in JSON")
}()

wg.Add(1)
go func() {
defer wg.Done()
})
Comment thread
dveeden marked this conversation as resolved.

wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
newCfg, err := cfg.Clone()
require.NoError(t, err)

Expand All @@ -501,14 +495,12 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
require.GreaterOrEqual(t, newCfg.DumpIOTotalBytes.Load(), uint64(200))
require.Equal(t, newCfg.UUID, uuid)
require.Equal(t, newCfg.DumpUUID, dumpUUID)
}()
})
Comment thread
dveeden marked this conversation as resolved.

wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
cfg.IOTotalBytes.Add(1)
cfg.DumpIOTotalBytes.Add(1)
}()
})
Comment thread
dveeden marked this conversation as resolved.
}
wg.Wait()

Expand Down
8 changes: 4 additions & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func DefaultMydumperConfig() MydumperConfig {
type rawMydumperConfig MydumperConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *MydumperConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawMydumperConfig(DefaultMydumperConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal mydumper config")
Expand Down Expand Up @@ -324,7 +324,7 @@ func DefaultLoaderConfig() LoaderConfig {
type rawLoaderConfig LoaderConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *LoaderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *LoaderConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawLoaderConfig(DefaultLoaderConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal loader config")
Expand Down Expand Up @@ -461,7 +461,7 @@ func DefaultSyncerConfig() SyncerConfig {
type rawSyncerConfig SyncerConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *SyncerConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawSyncerConfig(DefaultSyncerConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal syncer config")
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func (c *TaskConfig) adjust() error {
// getGenerateName generates name by rule or gets name from nameMap
// if it's a new name, increase nameIdx
// otherwise return current nameIdx.
func getGenerateName(rule interface{}, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
func getGenerateName(rule any, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
// use json as key since no DeepEqual for rules now.
ruleByte, err := json.Marshal(rule)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestIsForeignKeyChecksEnabled(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, c.expected, IsForeignKeyChecksEnabled(c.session))
Expand Down Expand Up @@ -1114,7 +1113,7 @@ func TestTaskConfigForDowngrade(t *testing.T) {
}

// Clone clones src to dest.
func Clone(dest, src interface{}) {
func Clone(dest, src any) {
cloneValues(reflect.ValueOf(dest), reflect.ValueOf(src))
}

Expand All @@ -1123,17 +1122,17 @@ func Clone(dest, src interface{}) {
func cloneValues(dest, src reflect.Value) {
destType := dest.Type()
srcType := src.Type()
if destType.Kind() == reflect.Ptr {
if destType.Kind() == reflect.Pointer {
destType = destType.Elem()
}
if srcType.Kind() == reflect.Ptr {
if srcType.Kind() == reflect.Pointer {
srcType = srcType.Elem()
}

if destType.Kind() == reflect.Map {
destMap := reflect.MakeMap(destType)
for _, k := range src.MapKeys() {
if src.MapIndex(k).Type().Kind() == reflect.Ptr {
if src.MapIndex(k).Type().Kind() == reflect.Pointer {
newVal := reflect.New(destType.Elem().Elem())
cloneValues(newVal, src.MapIndex(k))
destMap.SetMapIndex(k, newVal)
Expand All @@ -1148,7 +1147,7 @@ func cloneValues(dest, src reflect.Value) {
if destType.Kind() == reflect.Slice {
slice := reflect.MakeSlice(destType, src.Len(), src.Cap())
for i := 0; i < src.Len(); i++ {
if slice.Index(i).Type().Kind() == reflect.Ptr {
if slice.Index(i).Type().Kind() == reflect.Pointer {
newVal := reflect.New(slice.Index(i).Type().Elem())
cloneValues(newVal, src.Index(i))
slice.Index(i).Set(newVal)
Expand All @@ -1170,10 +1169,10 @@ func cloneValues(dest, src reflect.Value) {
srcField := src.Elem().Field(i)
destFieldType := destField.Type()
srcFieldType := srcField.Type()
if destFieldType.Kind() == reflect.Ptr {
if destFieldType.Kind() == reflect.Pointer {
destFieldType = destFieldType.Elem()
}
if srcFieldType.Kind() == reflect.Ptr {
if srcFieldType.Kind() == reflect.Pointer {
srcFieldType = srcFieldType.Elem()
}
if destFieldType != srcFieldType {
Expand Down
4 changes: 2 additions & 2 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (c *Config) adjust() error {

// validate host:port format address.
func validateAddr(addr string) error {
endpoints := strings.Split(addr, ",")
for _, endpoint := range endpoints {
endpoints := strings.SplitSeq(addr, ",")
for endpoint := range endpoints {
if _, _, err := net.SplitHostPort(utils.UnwrapScheme(endpoint)); err != nil {
return errors.Trace(err)
}
Expand Down
Loading
Loading