Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdc/sink/tablesink/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestStateString(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.want, func(t *testing.T) {
t.Parallel()
require.Equal(t, tc.want, tc.state.String())
Expand Down
10 changes: 5 additions & 5 deletions dm/pkg/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func TestExponentialBackoff(t *testing.T) {
Factor: factor,
}

for i := 0; i < 10; i++ {
for i := range 10 {
expected := minT * time.Duration(math.Pow(factor, float64(i)))
require.Equal(t, expected, b.Duration())
}
b.Rollback()
require.Equal(t, 512*minT, b.Current())
b.Forward()
for i := 0; i < 10; i++ {
for range 10 {
require.Equal(t, maxT, b.Duration())
}
b.Reset()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestFixedBackoff(t *testing.T) {
Max: maxT,
Factor: factor,
}
for i := 0; i < 10; i++ {
for range 10 {
require.Equal(t, maxT, b.Duration())
}
}
Expand Down Expand Up @@ -163,13 +163,13 @@ func TestForward(t *testing.T) {
Max: maxT,
Factor: factor,
}
for i := 0; i < n; i++ {
for range n {
b.Forward()
}
require.Equal(t, n, b.cwnd)
b.Reset()
require.Equal(t, 0, b.cwnd)
for i := 0; i < n; i++ {
for range n {
b.BoundaryForward()
}
require.Equal(t, 3, b.cwnd)
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
}

// verifySingleGTID verifies gSet whether only containing a single valid GTID.
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (any, error) {
if gtid.CheckGTIDSetEmpty(gSet) {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type DMLData struct {
Schema string
Table string
ColumnType []byte
Rows [][]interface{}
Rows [][]any

// if Query is not empty, we generate a Query event
Query string
Expand Down
16 changes: 8 additions & 8 deletions dm/pkg/binlog/event/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestGenDMLEvent(t *testing.T) {
require.Nil(t, result)

// single INSERT without batch
insertRows1 := make([][]interface{}, 0, 1)
insertRows1 = append(insertRows1, []interface{}{int32(11), "string column value"})
insertRows1 := make([][]any, 0, 1)
insertRows1 = append(insertRows1, []any{int32(11), "string column value"})
insertDMLData := []*DMLData{
{
TableID: 11,
Expand All @@ -69,8 +69,8 @@ func TestGenDMLEvent(t *testing.T) {
xid++

// multi INSERT with batch
insertRows2 := make([][]interface{}, 0, 2)
insertRows2 = append(insertRows2, []interface{}{int32(101), "string column value a"}, []interface{}{int32(102), "string column value b"})
insertRows2 := make([][]any, 0, 2)
insertRows2 = append(insertRows2, []any{int32(101), "string column value a"}, []any{int32(102), "string column value b"})
insertDMLData = append(insertDMLData, &DMLData{
TableID: 12,
Schema: "db2",
Expand All @@ -91,8 +91,8 @@ func TestGenDMLEvent(t *testing.T) {
xid++

// single UPDATE
updateRows := make([][]interface{}, 0, 2)
updateRows = append(updateRows, []interface{}{int32(21), "old string"}, []interface{}{int32(21), "new string"})
updateRows := make([][]any, 0, 2)
updateRows = append(updateRows, []any{int32(21), "old string"}, []any{int32(21), "new string"})
updateDMLData := []*DMLData{
{
TableID: 21,
Expand Down Expand Up @@ -121,8 +121,8 @@ func TestGenDMLEvent(t *testing.T) {
require.Nil(t, err)

// single DELETE
deleteRows := make([][]interface{}, 0, 1)
deleteRows = append(deleteRows, []interface{}{int32(31), "string a"})
deleteRows := make([][]any, 0, 1)
deleteRows = append(deleteRows, []any{int32(31), "string a"})
deleteDMLData := []*DMLData{
{
TableID: 31,
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func GenTableMapEvent(header *replication.EventHeader, latestPos uint32, tableID
//
// ref: https://dev.mysql.com/doc/internals/en/rows-event.html
// ref: http://blog.51cto.com/yanzongshuai/2090894
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) {
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]any, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) {
switch eventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2,
replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2,
Expand Down
20 changes: 10 additions & 10 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func TestGenRowsEvent(t *testing.T) {
tableID uint64 = 108
eventType = replication.TABLE_MAP_EVENT
rowsFlag = RowFlagsEndOfStatement
rows [][]interface{}
rows [][]any
columnType []byte // nil
)

Expand All @@ -496,7 +496,7 @@ func TestGenRowsEvent(t *testing.T) {
require.Nil(t, rowsEv)

// valid eventType and rows, invalid columnType
row := []interface{}{int32(1)}
row := []any{int32(1)}
rows = append(rows, row)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.NotNil(t, err)
Expand All @@ -522,15 +522,15 @@ func TestGenRowsEvent(t *testing.T) {
require.Equal(t, rows, rowsEvBody.Rows)

// multi rows, with different length, invalid
rows = append(rows, []interface{}{int32(1), int32(2)})
rows = append(rows, []any{int32(1), int32(2)})
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.NotNil(t, err)
require.Nil(t, rowsEv)

// multi rows, multi columns, valid
rows = make([][]interface{}, 0, 2)
rows = append(rows, []interface{}{int32(1), int32(2)})
rows = append(rows, []interface{}{int32(3), int32(4)})
rows = make([][]any, 0, 2)
rows = append(rows, []any{int32(1), int32(2)})
rows = append(rows, []any{int32(3), int32(4)})
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_LONG}
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.Nil(t, err)
Expand All @@ -556,8 +556,8 @@ func TestGenRowsEvent(t *testing.T) {
}

// more column types
rows = make([][]interface{}, 0, 1)
rows = append(rows, []interface{}{
rows = make([][]any, 0, 1)
rows = append(rows, []any{
int32(1), int8(2), int16(3), int32(4), int64(5),
float32(1.23), float64(4.56), "string with type STRING",
})
Expand All @@ -582,8 +582,8 @@ func TestGenRowsEvent(t *testing.T) {
require.Nil(t, rowsEv)

// NotSupported column type
rows = make([][]interface{}, 0, 1)
rows = append(rows, []interface{}{int32(1)})
rows = make([][]any, 0, 1)
rows = append(rows, []any{int32(1)})
unsupportedTypes := []byte{
gmysql.MYSQL_TYPE_VARCHAR, gmysql.MYSQL_TYPE_VAR_STRING,
gmysql.MYSQL_TYPE_NEWDECIMAL, gmysql.MYSQL_TYPE_BIT,
Expand Down
24 changes: 12 additions & 12 deletions dm/pkg/binlog/event/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq
tableID uint64 = 8
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_STRING}
)
insertRows := make([][]interface{}, 0, 1)
insertRows = append(insertRows, []interface{}{int32(1), "string 1"})
insertRows := make([][]any, 0, 1)
insertRows = append(insertRows, []any{int32(1), "string 1"})
dmlData := []*DMLData{
{
TableID: tableID,
Expand All @@ -186,10 +186,10 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq

// INSERT INTO `db`.`tbl` VALUES (11, "string 11"), (12, "string 12")
// INSERT INTO `db`.`tbl` VALUES (13, "string 13"),
insertRows1 := make([][]interface{}, 0, 2)
insertRows1 = append(insertRows1, []interface{}{int32(11), "string 11"}, []interface{}{int32(12), "string 12"})
insertRows2 := make([][]interface{}, 0, 1)
insertRows2 = append(insertRows2, []interface{}{int32(13), "string 13"})
insertRows1 := make([][]any, 0, 2)
insertRows1 = append(insertRows1, []any{int32(11), "string 11"}, []any{int32(12), "string 12"})
insertRows2 := make([][]any, 0, 1)
insertRows2 = append(insertRows2, []any{int32(13), "string 13"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand All @@ -215,10 +215,10 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq

// UPDATE `db`.`tbl` SET c2="another string 11" WHERE c1=11
// UPDATE `db`.`tbl` SET c1=120, c2="another string 120" WHERE C1=12
updateRows1 := make([][]interface{}, 0, 2)
updateRows1 = append(updateRows1, []interface{}{int32(11), "string 11"}, []interface{}{int32(11), "another string 11"})
updateRows2 := make([][]interface{}, 0, 2)
updateRows2 = append(updateRows2, []interface{}{int32(12), "string 12"}, []interface{}{int32(120), "another string 120"})
updateRows1 := make([][]any, 0, 2)
updateRows1 = append(updateRows1, []any{int32(11), "string 11"}, []any{int32(11), "another string 11"})
updateRows2 := make([][]any, 0, 2)
updateRows2 = append(updateRows2, []any{int32(12), "string 12"}, []any{int32(120), "another string 120"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand All @@ -244,8 +244,8 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq
allEventTypes = append(allEventTypes, gtidEventType(t, flavor), replication.QUERY_EVENT, replication.TABLE_MAP_EVENT, eventType, replication.TABLE_MAP_EVENT, eventType, replication.XID_EVENT)

// DELETE FROM `db`.`tbl` WHERE c1=13
deleteRows := make([][]interface{}, 0, 1)
deleteRows = append(deleteRows, []interface{}{int32(13), "string 13"})
deleteRows := make([][]any, 0, 1)
deleteRows = append(deleteRows, []any{int32(13), "string 13"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand Down
28 changes: 14 additions & 14 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func nullBytes(n int) []byte {
// fullBytes returns a n-length full bytes slice (all bits are set).
func fullBytes(n int) []byte {
buf := new(bytes.Buffer)
for i := 0; i < n; i++ {
for range n {
buf.WriteByte(0xff)
}
return buf.Bytes()
Expand Down Expand Up @@ -197,7 +197,7 @@ func combineHeaderPayload(buf *bytes.Buffer, header, postHeader, payload []byte)
// ref: https://github.com/go-mysql-org/go-mysql/blob/88e9cd7f6643b246b4dcc0e3206e9a169dd0ac96/replication/row_event.go#L368
// NOTE: we do not generate meaningful `meta` yet.
// nolint:unparam
func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
func encodeColumnValue(v any, tp byte, meta uint16) ([]byte, error) {
var (
buf = new(bytes.Buffer)
err error
Expand All @@ -206,33 +206,33 @@ func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
case gmysql.MYSQL_TYPE_NULL:
return nil, nil
case gmysql.MYSQL_TYPE_LONG:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int32(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int32]())
case gmysql.MYSQL_TYPE_TINY:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int8(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int8]())
case gmysql.MYSQL_TYPE_SHORT:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int16(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int16]())
case gmysql.MYSQL_TYPE_INT24:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int32(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int32]())
if err == nil {
buf.Truncate(3)
}
case gmysql.MYSQL_TYPE_LONGLONG:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int64(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int64]())
case gmysql.MYSQL_TYPE_FLOAT:
value, ok := v.(float32)
if !ok {
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeOf(float32(0)))
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeFor[float32]())
} else {
bits := math.Float32bits(value)
err = writeIntegerColumnValue(buf, bits, reflect.TypeOf(uint32(0)))
err = writeIntegerColumnValue(buf, bits, reflect.TypeFor[uint32]())
}
case gmysql.MYSQL_TYPE_DOUBLE:
value, ok := v.(float64)
if !ok {
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeOf(float64(0)))
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeFor[float64]())
} else {
bits := math.Float64bits(value)
err = writeIntegerColumnValue(buf, bits, reflect.TypeOf(uint64(0)))
err = writeIntegerColumnValue(buf, bits, reflect.TypeFor[uint64]())
}
case gmysql.MYSQL_TYPE_STRING:
err = writeStringColumnValue(buf, v)
Expand All @@ -252,18 +252,18 @@ func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
}

// writeIntegerColumnValue writes integer value to bytes buffer.
func writeIntegerColumnValue(buf *bytes.Buffer, value interface{}, valueType reflect.Type) error {
func writeIntegerColumnValue(buf *bytes.Buffer, value any, valueType reflect.Type) error {
if reflect.TypeOf(value) != valueType {
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), valueType)
}
return terror.ErrBinlogWriteBinaryData.Delegate(binary.Write(buf, binary.LittleEndian, value))
}

// writeStringColumnValue writes string value to bytes buffer.
func writeStringColumnValue(buf *bytes.Buffer, value interface{}) error {
func writeStringColumnValue(buf *bytes.Buffer, value any) error {
str, ok := value.(string)
if !ok {
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), reflect.TypeOf(""))
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), reflect.TypeFor[string]())
}
var (
err error
Expand Down
5 changes: 3 additions & 2 deletions dm/pkg/func-rollback/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package rollback

import (
"slices"
"sync"

"github.com/pingcap/tiflow/dm/pkg/log"
Expand Down Expand Up @@ -53,8 +54,8 @@ func (h *FuncRollbackHolder) Add(fn FuncRollback) {
func (h *FuncRollbackHolder) RollbackReverseOrder() {
h.mu.Lock()
defer h.mu.Unlock()
for i := len(h.fns) - 1; i >= 0; i-- {
fn := h.fns[i]
for _, fn := range slices.Backward(h.fns) {

log.L().Info("rolling back", zap.String("functon", fn.Name), zap.String("onwer", h.owner))
fn.Fn()
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/func-rollback/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestRollback(t *testing.T) {
expected = append(expected, i) // [4, 3, 2, 1, 0]
}

for i := 0; i < total; i++ {
for i := range total {
h.Add(FuncRollback{Name: fmt.Sprintf("test-%d", i), Fn: rf})
}
h.RollbackReverseOrder()
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/helper/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import "reflect"

// IsNil tests whether the passed in value is nil.
// ref https://github.com/golang/go/blob/87113f7eadf6d8b12279709f05c0359b54b194ea/src/reflect/value.go#L1049.
func IsNil(vi interface{}) (result bool) {
func IsNil(vi any) (result bool) {
if vi == nil {
result = true
} else {
switch v := reflect.ValueOf(vi); v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.UnsafePointer, reflect.Slice:
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Pointer, reflect.UnsafePointer, reflect.Slice:
return v.IsNil()
}
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/terror/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// DBErrorAdaptArgs is an adapter to change raw database error to *Error object.
// If err is already an *Error object, return it directly.
func DBErrorAdaptArgs(err error, defaultErr *Error, args ...interface{}) error {
func DBErrorAdaptArgs(err error, defaultErr *Error, args ...any) error {
if err == nil {
return nil
}
Expand All @@ -42,6 +42,6 @@ func DBErrorAdaptArgs(err error, defaultErr *Error, args ...interface{}) error {

// DBErrorAdapt is an adapter to change raw database error to *Error object.
// If err is already an *Error object, return it directly.
func DBErrorAdapt(err error, scope ErrScope, defaultErr *Error, args ...interface{}) error {
func DBErrorAdapt(err error, scope ErrScope, defaultErr *Error, args ...any) error {
return WithScope(DBErrorAdaptArgs(err, defaultErr, args...), scope)
}
Loading
Loading