Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dm-worker/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRunMain(_ *testing.T) {
var (
args []string
exit = make(chan int)
waitCh = make(chan interface{}, 1)
waitCh = make(chan any, 1)
)
for _, arg := range os.Args {
switch {
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *dbConn) execSQLs(ctx context.Context, queries ...string) error {
}

_, _, err := c.baseConn.ApplyRetryStrategy(tcontext.NewContext(ctx, log.L()), params,
func(tctx *tcontext.Context) (interface{}, error) {
func(tctx *tcontext.Context) (any, error) {
ret, err2 := c.baseConn.ExecuteSQLWithIgnoreError(tctx, nil, "chaos-cases", ignoreExecSQLError, queries)
return ret, err2
})
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

// diffDataLoop checks whether target has the same data with source via `sync-diff-inspector` multiple times.
func diffDataLoop(ctx context.Context, count int, interval time.Duration, schema string, tables []string, targetDB *sql.DB, sourceDBs ...*sql.DB) (err error) {
for i := 0; i < count; i++ {
for i := range count {
select {
case <-ctx.Done():
return nil
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (g *CaseGenerator) genSQLs(ctx context.Context) {
for _, table := range g.tables {
rand.Shuffle(len(g.testCases), func(i, j int) { g.testCases[i], g.testCases[j] = g.testCases[j], g.testCases[i] })
casesNum := rand.Intn(len(g.testCases) + 1)
for i := 0; i < casesNum; i++ {
for i := range casesNum {
for _, sqls := range g.testCases[i] {
fullSqls := make(SQLs, len(sqls))
copy(fullSqls, sqls)
Expand Down
2 changes: 1 addition & 1 deletion dm/chaos/cases/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// checkMembersReadyLoop checks whether all DM-master and DM-worker members have been ready.
// NOTE: in this chaos case, we ensure 3 DM-master and 3 DM-worker started.
func checkMembersReadyLoop(ctx context.Context, cli pb.MasterClient, masterCount, workerCount int) (err error) {
for i := 0; i < checkMemberTimes; i++ {
for range checkMemberTimes {
select {
case <-ctx.Done():
return nil
Expand Down
4 changes: 2 additions & 2 deletions dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (t *task) genFullData() error {
)

// generate `CREATE TABLE` statements.
for i := 0; i < tableCount; i++ {
for range tableCount {
query, name, err := t.ss[0].CreateTableStmt()
if err != nil {
return err
Expand Down Expand Up @@ -258,7 +258,7 @@ func (t *task) genFullData() error {
for _, conn := range t.sourceConns {
conn2 := conn
eg.Go(func() error {
for i := 0; i < fullInsertCount; i++ {
for range fullInsertCount {
query, _, err2 := t.ss[0].InsertStmt(false)
if err2 != nil {
return err2
Expand Down
10 changes: 5 additions & 5 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (c *Checker) getTablePairInfo(ctx context.Context) (info *tablePairInfo, er
pool := checker.NewWorkerPoolWithContext[job, int64](ctx, func(result int64) {
info.totalDataSize.Add(result)
})
for i := 0; i < concurrency; i++ {
for range concurrency {
pool.Go(func(ctx context.Context, job job) (int64, error) {
return conn.FetchTableEstimatedBytes(
ctx,
Expand Down Expand Up @@ -644,7 +644,7 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
if result.Summary.Successful != result.Summary.Total {
rawResult, err = json.MarshalIndent(result, "\t", "\t")
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal error %v", err))
rawResult = fmt.Appendf(nil, "marshal error %v", err)
}
}
c.result.Lock()
Expand Down Expand Up @@ -830,14 +830,14 @@ func (c *Checker) IsFreshTask() (bool, error) {
}

// Status implements Unit interface.
func (c *Checker) Status(_ *binlog.SourceStatus) interface{} {
func (c *Checker) Status(_ *binlog.SourceStatus) any {
c.result.RLock()
res := c.result.detail
c.result.RUnlock()

rawResult, err := json.Marshal(res)
if err != nil {
rawResult = []byte(fmt.Sprintf("marshal %+v error %v", res, err))
rawResult = fmt.Appendf(nil, "marshal %+v error %v", res, err)
}

return &pb.CheckStatus{
Expand All @@ -851,7 +851,7 @@ func (c *Checker) Status(_ *binlog.SourceStatus) interface{} {
}

// Error implements Unit interface.
func (c *Checker) Error() interface{} {
func (c *Checker) Error() any {
return &pb.CheckError{}
}

Expand Down
6 changes: 2 additions & 4 deletions dm/config/checking_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"bytes"
"fmt"
"maps"

"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -129,10 +130,7 @@ func SupportCheckingItems() string {

// FilterCheckingItems filters ignored items from all checking items.
func FilterCheckingItems(ignoredItems []string) map[string]string {
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
checkingItems := maps.Clone(AllCheckingItems)
delete(checkingItems, AllChecking)

for _, item := range ignoredItems {
Expand Down
6 changes: 2 additions & 4 deletions dm/config/checking_item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package config

import (
"maps"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,10 +45,7 @@ func TestCheckingItems(t *testing.T) {
require.Empty(t, FilterCheckingItems(ignoredCheckingItems))

// ignore shard checking items
checkingItems := make(map[string]string)
for item, desc := range AllCheckingItems {
checkingItems[item] = desc
}
checkingItems := maps.Clone(AllCheckingItems)
delete(checkingItems, AllChecking)

require.Equal(t, checkingItems, FilterCheckingItems(ignoredCheckingItems[:0]))
Expand Down
8 changes: 2 additions & 6 deletions dm/config/dbconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package dbconfig
import (
"bytes"
"encoding/json"
"maps"
"strings"

"github.com/BurntSushi/toml"
Expand Down Expand Up @@ -148,12 +149,7 @@ func (db *DBConfig) Clone() *DBConfig {
clone.MaxAllowedPacket = &packet
}

if db.Session != nil {
clone.Session = make(map[string]string, len(db.Session))
for k, v := range db.Session {
clone.Session[k] = v
}
}
clone.Session = maps.Clone(db.Session)

clone.Security = db.Security.Clone()

Expand Down
2 changes: 1 addition & 1 deletion dm/config/security_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (c *testTLSConfig) TestClone() {
}
// When add new fields, also update this value
// TODO: check it
c.Require().Equal(10, reflect.TypeOf(*s).NumField())
c.Require().Equal(10, reflect.TypeFor[security.Security]().NumField())
clone := s.Clone()
c.Require().Equal(s, clone)
clone.CertAllowedCN[0] = "g"
Expand Down
34 changes: 17 additions & 17 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type SourceConfig struct {
ServerID uint32 `yaml:"server-id" toml:"server-id" json:"server-id"`

// deprecated tracer, to keep compatibility with older version
Tracer map[string]interface{} `yaml:"tracer" toml:"tracer" json:"-"`
Tracer map[string]any `yaml:"tracer" toml:"tracer" json:"-"`

CaseSensitive bool `yaml:"case-sensitive" toml:"case-sensitive" json:"case-sensitive"`
Filters []*bf.BinlogEventRule `yaml:"filters" toml:"filters" json:"filters"`
Expand Down Expand Up @@ -366,7 +366,7 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *conn.BaseDB) erro
}

rand.Seed(time.Now().UnixNano())
for i := 0; i < 5; i++ {
for range 5 {
randomValue := uint32(rand.Intn(100000))
randomServerID := defaultBaseServerID + randomValue
if _, ok := serverIDs[randomServerID]; ok {
Expand Down Expand Up @@ -403,21 +403,21 @@ func (c *SourceConfig) YamlForDowngrade() (string, error) {
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type SourceConfigForDowngrade struct {
Enable bool `yaml:"enable,omitempty"`
EnableGTID bool `yaml:"enable-gtid"`
RelayDir string `yaml:"relay-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
SourceID string `yaml:"source-id"`
From dbconfig.DBConfig `yaml:"from"`
Purge PurgeConfig `yaml:"purge"`
Checker CheckerConfig `yaml:"checker"`
ServerID uint32 `yaml:"server-id"`
Tracer map[string]interface{} `yaml:"tracer"`
Enable bool `yaml:"enable,omitempty"`
EnableGTID bool `yaml:"enable-gtid"`
RelayDir string `yaml:"relay-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
SourceID string `yaml:"source-id"`
From dbconfig.DBConfig `yaml:"from"`
Purge PurgeConfig `yaml:"purge"`
Checker CheckerConfig `yaml:"checker"`
ServerID uint32 `yaml:"server-id"`
Tracer map[string]any `yaml:"tracer"`
// any new config item, we mark it omitempty
CaseSensitive bool `yaml:"case-sensitive,omitempty"`
Filters []*bf.BinlogEventRule `yaml:"filters,omitempty"`
Expand Down
2 changes: 1 addition & 1 deletion dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestConfigFunctions(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint32(100), cfg1.ServerID)
cfg.Filters = []*bf.BinlogEventRule{}
cfg.Tracer = map[string]interface{}{}
cfg.Tracer = map[string]any{}

var cfg2 SourceConfig
require.NoError(t, cfg2.FromToml(originCfgStr))
Expand Down
24 changes: 8 additions & 16 deletions dm/config/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,14 +462,11 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
require.Equal(t, cfg.DumpIOTotalBytes.Load(), uint64(200))

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for range 10 {
wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
data, err := json.Marshal(cfg)
require.NoError(t, err)
jsonMap := make(map[string]interface{})
jsonMap := make(map[string]any)
err = json.Unmarshal(data, &jsonMap)
require.NoError(t, err)

Expand All @@ -487,12 +484,9 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
_, hasDumpUUID := jsonMap["dump-uuid"]
require.False(t, hasUUID, "UUID should not be in JSON")
require.False(t, hasDumpUUID, "DumpUUID should not be in JSON")
}()

wg.Add(1)
go func() {
defer wg.Done()
})
Comment thread
dveeden marked this conversation as resolved.

wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
newCfg, err := cfg.Clone()
require.NoError(t, err)

Expand All @@ -501,14 +495,12 @@ func TestSubTaskConfigMarshalAtomic(t *testing.T) {
require.GreaterOrEqual(t, newCfg.DumpIOTotalBytes.Load(), uint64(200))
require.Equal(t, newCfg.UUID, uuid)
require.Equal(t, newCfg.DumpUUID, dumpUUID)
}()
})
Comment thread
dveeden marked this conversation as resolved.

wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
Comment thread
dveeden marked this conversation as resolved.
cfg.IOTotalBytes.Add(1)
cfg.DumpIOTotalBytes.Add(1)
}()
})
Comment thread
dveeden marked this conversation as resolved.
}
wg.Wait()

Expand Down
8 changes: 4 additions & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func DefaultMydumperConfig() MydumperConfig {
type rawMydumperConfig MydumperConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *MydumperConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *MydumperConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawMydumperConfig(DefaultMydumperConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal mydumper config")
Expand Down Expand Up @@ -324,7 +324,7 @@ func DefaultLoaderConfig() LoaderConfig {
type rawLoaderConfig LoaderConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *LoaderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *LoaderConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawLoaderConfig(DefaultLoaderConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal loader config")
Expand Down Expand Up @@ -461,7 +461,7 @@ func DefaultSyncerConfig() SyncerConfig {
type rawSyncerConfig SyncerConfig

// UnmarshalYAML implements Unmarshaler.UnmarshalYAML.
func (m *SyncerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
func (m *SyncerConfig) UnmarshalYAML(unmarshal func(any) error) error {
raw := rawSyncerConfig(DefaultSyncerConfig())
if err := unmarshal(&raw); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "unmarshal syncer config")
Expand Down Expand Up @@ -1050,7 +1050,7 @@ func (c *TaskConfig) adjust() error {
// getGenerateName generates name by rule or gets name from nameMap
// if it's a new name, increase nameIdx
// otherwise return current nameIdx.
func getGenerateName(rule interface{}, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
func getGenerateName(rule any, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
// use json as key since no DeepEqual for rules now.
ruleByte, err := json.Marshal(rule)
if err != nil {
Expand Down
15 changes: 7 additions & 8 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func TestIsForeignKeyChecksEnabled(t *testing.T) {
}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
t.Parallel()
require.Equal(t, c.expected, IsForeignKeyChecksEnabled(c.session))
Expand Down Expand Up @@ -1114,7 +1113,7 @@ func TestTaskConfigForDowngrade(t *testing.T) {
}

// Clone clones src to dest.
func Clone(dest, src interface{}) {
func Clone(dest, src any) {
cloneValues(reflect.ValueOf(dest), reflect.ValueOf(src))
}

Expand All @@ -1123,17 +1122,17 @@ func Clone(dest, src interface{}) {
func cloneValues(dest, src reflect.Value) {
destType := dest.Type()
srcType := src.Type()
if destType.Kind() == reflect.Ptr {
if destType.Kind() == reflect.Pointer {
destType = destType.Elem()
}
if srcType.Kind() == reflect.Ptr {
if srcType.Kind() == reflect.Pointer {
srcType = srcType.Elem()
}

if destType.Kind() == reflect.Map {
destMap := reflect.MakeMap(destType)
for _, k := range src.MapKeys() {
if src.MapIndex(k).Type().Kind() == reflect.Ptr {
if src.MapIndex(k).Type().Kind() == reflect.Pointer {
newVal := reflect.New(destType.Elem().Elem())
cloneValues(newVal, src.MapIndex(k))
destMap.SetMapIndex(k, newVal)
Expand All @@ -1148,7 +1147,7 @@ func cloneValues(dest, src reflect.Value) {
if destType.Kind() == reflect.Slice {
slice := reflect.MakeSlice(destType, src.Len(), src.Cap())
for i := 0; i < src.Len(); i++ {
if slice.Index(i).Type().Kind() == reflect.Ptr {
if slice.Index(i).Type().Kind() == reflect.Pointer {
newVal := reflect.New(slice.Index(i).Type().Elem())
cloneValues(newVal, src.Index(i))
slice.Index(i).Set(newVal)
Expand All @@ -1170,10 +1169,10 @@ func cloneValues(dest, src reflect.Value) {
srcField := src.Elem().Field(i)
destFieldType := destField.Type()
srcFieldType := srcField.Type()
if destFieldType.Kind() == reflect.Ptr {
if destFieldType.Kind() == reflect.Pointer {
destFieldType = destFieldType.Elem()
}
if srcFieldType.Kind() == reflect.Ptr {
if srcFieldType.Kind() == reflect.Pointer {
srcFieldType = srcFieldType.Elem()
}
if destFieldType != srcFieldType {
Expand Down
4 changes: 2 additions & 2 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (c *Config) adjust() error {

// validate host:port format address.
func validateAddr(addr string) error {
endpoints := strings.Split(addr, ",")
for _, endpoint := range endpoints {
endpoints := strings.SplitSeq(addr, ",")
for endpoint := range endpoints {
if _, _, err := net.SplitHostPort(utils.UnwrapScheme(endpoint)); err != nil {
return errors.Trace(err)
}
Expand Down
Loading
Loading