Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cdc/sink/tablesink/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestStateString(t *testing.T) {
}

for _, tc := range testCases {
tc := tc
t.Run(tc.want, func(t *testing.T) {
t.Parallel()
require.Equal(t, tc.want, tc.state.String())
Expand Down
10 changes: 5 additions & 5 deletions dm/pkg/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ func TestExponentialBackoff(t *testing.T) {
Factor: factor,
}

for i := 0; i < 10; i++ {
for i := range 10 {
expected := minT * time.Duration(math.Pow(factor, float64(i)))
require.Equal(t, expected, b.Duration())
}
b.Rollback()
require.Equal(t, 512*minT, b.Current())
b.Forward()
for i := 0; i < 10; i++ {
for range 10 {
require.Equal(t, maxT, b.Duration())
}
b.Reset()
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestFixedBackoff(t *testing.T) {
Max: maxT,
Factor: factor,
}
for i := 0; i < 10; i++ {
for range 10 {
require.Equal(t, maxT, b.Duration())
}
}
Expand Down Expand Up @@ -163,13 +163,13 @@ func TestForward(t *testing.T) {
Max: maxT,
Factor: factor,
}
for i := 0; i < n; i++ {
for range n {
b.Forward()
}
require.Equal(t, n, b.cwnd)
b.Reset()
require.Equal(t, 0, b.cwnd)
for i := 0; i < n; i++ {
for range n {
b.BoundaryForward()
}
require.Equal(t, 3, b.cwnd)
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
}

// verifySingleGTID verifies gSet whether only containing a single valid GTID.
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (any, error) {
if gtid.CheckGTIDSetEmpty(gSet) {
return nil, terror.ErrBinlogEmptyGTID.Generate()
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type DMLData struct {
Schema string
Table string
ColumnType []byte
Rows [][]interface{}
Rows [][]any

// if Query is not empty, we generate a Query event
Query string
Expand Down
16 changes: 8 additions & 8 deletions dm/pkg/binlog/event/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestGenDMLEvent(t *testing.T) {
require.Nil(t, result)

// single INSERT without batch
insertRows1 := make([][]interface{}, 0, 1)
insertRows1 = append(insertRows1, []interface{}{int32(11), "string column value"})
insertRows1 := make([][]any, 0, 1)
insertRows1 = append(insertRows1, []any{int32(11), "string column value"})
insertDMLData := []*DMLData{
{
TableID: 11,
Expand All @@ -69,8 +69,8 @@ func TestGenDMLEvent(t *testing.T) {
xid++

// multi INSERT with batch
insertRows2 := make([][]interface{}, 0, 2)
insertRows2 = append(insertRows2, []interface{}{int32(101), "string column value a"}, []interface{}{int32(102), "string column value b"})
insertRows2 := make([][]any, 0, 2)
insertRows2 = append(insertRows2, []any{int32(101), "string column value a"}, []any{int32(102), "string column value b"})
insertDMLData = append(insertDMLData, &DMLData{
TableID: 12,
Schema: "db2",
Expand All @@ -91,8 +91,8 @@ func TestGenDMLEvent(t *testing.T) {
xid++

// single UPDATE
updateRows := make([][]interface{}, 0, 2)
updateRows = append(updateRows, []interface{}{int32(21), "old string"}, []interface{}{int32(21), "new string"})
updateRows := make([][]any, 0, 2)
updateRows = append(updateRows, []any{int32(21), "old string"}, []any{int32(21), "new string"})
updateDMLData := []*DMLData{
{
TableID: 21,
Expand Down Expand Up @@ -121,8 +121,8 @@ func TestGenDMLEvent(t *testing.T) {
require.Nil(t, err)

// single DELETE
deleteRows := make([][]interface{}, 0, 1)
deleteRows = append(deleteRows, []interface{}{int32(31), "string a"})
deleteRows := make([][]any, 0, 1)
deleteRows = append(deleteRows, []any{int32(31), "string a"})
deleteDMLData := []*DMLData{
{
TableID: 31,
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func GenTableMapEvent(header *replication.EventHeader, latestPos uint32, tableID
//
// ref: https://dev.mysql.com/doc/internals/en/rows-event.html
// ref: http://blog.51cto.com/yanzongshuai/2090894
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]interface{}, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) {
func GenRowsEvent(header *replication.EventHeader, latestPos uint32, eventType replication.EventType, tableID uint64, rowsFlags uint16, rows [][]any, columnType []byte, tableMapEv *replication.BinlogEvent) (*replication.BinlogEvent, error) {
switch eventType {
case replication.WRITE_ROWS_EVENTv0, replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2,
replication.UPDATE_ROWS_EVENTv0, replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2,
Expand Down
20 changes: 10 additions & 10 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func TestGenRowsEvent(t *testing.T) {
tableID uint64 = 108
eventType = replication.TABLE_MAP_EVENT
rowsFlag = RowFlagsEndOfStatement
rows [][]interface{}
rows [][]any
columnType []byte // nil
)

Expand All @@ -496,7 +496,7 @@ func TestGenRowsEvent(t *testing.T) {
require.Nil(t, rowsEv)

// valid eventType and rows, invalid columnType
row := []interface{}{int32(1)}
row := []any{int32(1)}
rows = append(rows, row)
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.NotNil(t, err)
Expand All @@ -522,15 +522,15 @@ func TestGenRowsEvent(t *testing.T) {
require.Equal(t, rows, rowsEvBody.Rows)

// multi rows, with different length, invalid
rows = append(rows, []interface{}{int32(1), int32(2)})
rows = append(rows, []any{int32(1), int32(2)})
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.NotNil(t, err)
require.Nil(t, rowsEv)

// multi rows, multi columns, valid
rows = make([][]interface{}, 0, 2)
rows = append(rows, []interface{}{int32(1), int32(2)})
rows = append(rows, []interface{}{int32(3), int32(4)})
rows = make([][]any, 0, 2)
rows = append(rows, []any{int32(1), int32(2)})
rows = append(rows, []any{int32(3), int32(4)})
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_LONG}
rowsEv, err = GenRowsEvent(header, latestPos, eventType, tableID, rowsFlag, rows, columnType, nil)
require.Nil(t, err)
Expand All @@ -556,8 +556,8 @@ func TestGenRowsEvent(t *testing.T) {
}

// more column types
rows = make([][]interface{}, 0, 1)
rows = append(rows, []interface{}{
rows = make([][]any, 0, 1)
rows = append(rows, []any{
int32(1), int8(2), int16(3), int32(4), int64(5),
float32(1.23), float64(4.56), "string with type STRING",
})
Expand All @@ -582,8 +582,8 @@ func TestGenRowsEvent(t *testing.T) {
require.Nil(t, rowsEv)

// NotSupported column type
rows = make([][]interface{}, 0, 1)
rows = append(rows, []interface{}{int32(1)})
rows = make([][]any, 0, 1)
rows = append(rows, []any{int32(1)})
unsupportedTypes := []byte{
gmysql.MYSQL_TYPE_VARCHAR, gmysql.MYSQL_TYPE_VAR_STRING,
gmysql.MYSQL_TYPE_NEWDECIMAL, gmysql.MYSQL_TYPE_BIT,
Expand Down
24 changes: 12 additions & 12 deletions dm/pkg/binlog/event/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq
tableID uint64 = 8
columnType = []byte{gmysql.MYSQL_TYPE_LONG, gmysql.MYSQL_TYPE_STRING}
)
insertRows := make([][]interface{}, 0, 1)
insertRows = append(insertRows, []interface{}{int32(1), "string 1"})
insertRows := make([][]any, 0, 1)
insertRows = append(insertRows, []any{int32(1), "string 1"})
dmlData := []*DMLData{
{
TableID: tableID,
Expand All @@ -186,10 +186,10 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq

// INSERT INTO `db`.`tbl` VALUES (11, "string 11"), (12, "string 12")
// INSERT INTO `db`.`tbl` VALUES (13, "string 13"),
insertRows1 := make([][]interface{}, 0, 2)
insertRows1 = append(insertRows1, []interface{}{int32(11), "string 11"}, []interface{}{int32(12), "string 12"})
insertRows2 := make([][]interface{}, 0, 1)
insertRows2 = append(insertRows2, []interface{}{int32(13), "string 13"})
insertRows1 := make([][]any, 0, 2)
insertRows1 = append(insertRows1, []any{int32(11), "string 11"}, []any{int32(12), "string 12"})
insertRows2 := make([][]any, 0, 1)
insertRows2 = append(insertRows2, []any{int32(13), "string 13"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand All @@ -215,10 +215,10 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq

// UPDATE `db`.`tbl` SET c2="another string 11" WHERE c1=11
// UPDATE `db`.`tbl` SET c1=120, c2="another string 120" WHERE C1=12
updateRows1 := make([][]interface{}, 0, 2)
updateRows1 = append(updateRows1, []interface{}{int32(11), "string 11"}, []interface{}{int32(11), "another string 11"})
updateRows2 := make([][]interface{}, 0, 2)
updateRows2 = append(updateRows2, []interface{}{int32(12), "string 12"}, []interface{}{int32(120), "another string 120"})
updateRows1 := make([][]any, 0, 2)
updateRows1 = append(updateRows1, []any{int32(11), "string 11"}, []any{int32(11), "another string 11"})
updateRows2 := make([][]any, 0, 2)
updateRows2 = append(updateRows2, []any{int32(12), "string 12"}, []any{int32(120), "another string 120"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand All @@ -244,8 +244,8 @@ func testGenerate(t *testing.T, flavor string, serverID uint32, latestGTID gmysq
allEventTypes = append(allEventTypes, gtidEventType(t, flavor), replication.QUERY_EVENT, replication.TABLE_MAP_EVENT, eventType, replication.TABLE_MAP_EVENT, eventType, replication.XID_EVENT)

// DELETE FROM `db`.`tbl` WHERE c1=13
deleteRows := make([][]interface{}, 0, 1)
deleteRows = append(deleteRows, []interface{}{int32(13), "string 13"})
deleteRows := make([][]any, 0, 1)
deleteRows = append(deleteRows, []any{int32(13), "string 13"})
dmlData = []*DMLData{
{
TableID: tableID,
Expand Down
28 changes: 14 additions & 14 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func nullBytes(n int) []byte {
// fullBytes returns a n-length full bytes slice (all bits are set).
func fullBytes(n int) []byte {
buf := new(bytes.Buffer)
for i := 0; i < n; i++ {
for range n {
buf.WriteByte(0xff)
}
return buf.Bytes()
Expand Down Expand Up @@ -197,7 +197,7 @@ func combineHeaderPayload(buf *bytes.Buffer, header, postHeader, payload []byte)
// ref: https://github.com/go-mysql-org/go-mysql/blob/88e9cd7f6643b246b4dcc0e3206e9a169dd0ac96/replication/row_event.go#L368
// NOTE: we do not generate meaningful `meta` yet.
// nolint:unparam
func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
func encodeColumnValue(v any, tp byte, meta uint16) ([]byte, error) {
var (
buf = new(bytes.Buffer)
err error
Expand All @@ -206,33 +206,33 @@ func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
case gmysql.MYSQL_TYPE_NULL:
return nil, nil
case gmysql.MYSQL_TYPE_LONG:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int32(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int32]())
case gmysql.MYSQL_TYPE_TINY:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int8(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int8]())
case gmysql.MYSQL_TYPE_SHORT:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int16(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int16]())
case gmysql.MYSQL_TYPE_INT24:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int32(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int32]())
if err == nil {
buf.Truncate(3)
}
case gmysql.MYSQL_TYPE_LONGLONG:
err = writeIntegerColumnValue(buf, v, reflect.TypeOf(int64(0)))
err = writeIntegerColumnValue(buf, v, reflect.TypeFor[int64]())
case gmysql.MYSQL_TYPE_FLOAT:
value, ok := v.(float32)
if !ok {
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeOf(float32(0)))
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeFor[float32]())
} else {
bits := math.Float32bits(value)
err = writeIntegerColumnValue(buf, bits, reflect.TypeOf(uint32(0)))
err = writeIntegerColumnValue(buf, bits, reflect.TypeFor[uint32]())
}
case gmysql.MYSQL_TYPE_DOUBLE:
value, ok := v.(float64)
if !ok {
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeOf(float64(0)))
err = terror.ErrBinlogColumnTypeMisMatch.Generate(v, reflect.TypeOf(v), reflect.TypeFor[float64]())
} else {
bits := math.Float64bits(value)
err = writeIntegerColumnValue(buf, bits, reflect.TypeOf(uint64(0)))
err = writeIntegerColumnValue(buf, bits, reflect.TypeFor[uint64]())
}
case gmysql.MYSQL_TYPE_STRING:
err = writeStringColumnValue(buf, v)
Expand All @@ -252,18 +252,18 @@ func encodeColumnValue(v interface{}, tp byte, meta uint16) ([]byte, error) {
}

// writeIntegerColumnValue writes integer value to bytes buffer.
func writeIntegerColumnValue(buf *bytes.Buffer, value interface{}, valueType reflect.Type) error {
func writeIntegerColumnValue(buf *bytes.Buffer, value any, valueType reflect.Type) error {
if reflect.TypeOf(value) != valueType {
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), valueType)
}
return terror.ErrBinlogWriteBinaryData.Delegate(binary.Write(buf, binary.LittleEndian, value))
}

// writeStringColumnValue writes string value to bytes buffer.
func writeStringColumnValue(buf *bytes.Buffer, value interface{}) error {
func writeStringColumnValue(buf *bytes.Buffer, value any) error {
str, ok := value.(string)
if !ok {
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), reflect.TypeOf(""))
return terror.ErrBinlogColumnTypeMisMatch.Generate(value, reflect.TypeOf(value), reflect.TypeFor[string]())
}
var (
err error
Expand Down
5 changes: 3 additions & 2 deletions dm/pkg/func-rollback/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package rollback

import (
"slices"
"sync"

"github.com/pingcap/tiflow/dm/pkg/log"
Expand Down Expand Up @@ -53,8 +54,8 @@ func (h *FuncRollbackHolder) Add(fn FuncRollback) {
func (h *FuncRollbackHolder) RollbackReverseOrder() {
h.mu.Lock()
defer h.mu.Unlock()
for i := len(h.fns) - 1; i >= 0; i-- {
fn := h.fns[i]
for _, fn := range slices.Backward(h.fns) {

log.L().Info("rolling back", zap.String("functon", fn.Name), zap.String("onwer", h.owner))
fn.Fn()
}
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/func-rollback/rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestRollback(t *testing.T) {
expected = append(expected, i) // [4, 3, 2, 1, 0]
}

for i := 0; i < total; i++ {
for i := range total {
h.Add(FuncRollback{Name: fmt.Sprintf("test-%d", i), Fn: rf})
}
h.RollbackReverseOrder()
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/helper/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import "reflect"

// IsNil tests whether the passed in value is nil.
// ref https://github.com/golang/go/blob/87113f7eadf6d8b12279709f05c0359b54b194ea/src/reflect/value.go#L1049.
func IsNil(vi interface{}) (result bool) {
func IsNil(vi any) (result bool) {
if vi == nil {
result = true
} else {
switch v := reflect.ValueOf(vi); v.Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.UnsafePointer, reflect.Slice:
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Pointer, reflect.UnsafePointer, reflect.Slice:
return v.IsNil()
}
}
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/terror/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// DBErrorAdaptArgs is an adapter to change raw database error to *Error object.
// If err is already an *Error object, return it directly.
func DBErrorAdaptArgs(err error, defaultErr *Error, args ...interface{}) error {
func DBErrorAdaptArgs(err error, defaultErr *Error, args ...any) error {
if err == nil {
return nil
}
Expand All @@ -42,6 +42,6 @@ func DBErrorAdaptArgs(err error, defaultErr *Error, args ...interface{}) error {

// DBErrorAdapt is an adapter to change raw database error to *Error object.
// If err is already an *Error object, return it directly.
func DBErrorAdapt(err error, scope ErrScope, defaultErr *Error, args ...interface{}) error {
func DBErrorAdapt(err error, scope ErrScope, defaultErr *Error, args ...any) error {
return WithScope(DBErrorAdaptArgs(err, defaultErr, args...), scope)
}
Loading
Loading