Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions engine/chaos/cases/case_fake_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func runFakeJobCase(ctx context.Context, cfg *config) error {
mvcc := 0
interval := 60 * time.Second
runTime := 10
for i := 0; i < runTime; i++ {
for i := range runTime {
value := fmt.Sprintf("update-value-index-%d", i)
mvcc++
start := time.Now()
Expand All @@ -119,15 +119,15 @@ func updateKeyAndCheck(
ctx context.Context, cli *e2e.ChaosCli, jobID string, workerCount int,
updateValue string, expectedMvcc int,
) error {
for i := 0; i < workerCount; i++ {
for i := range workerCount {
err := cli.UpdateFakeJobKey(ctx, i, updateValue)
if err != nil {
return err
}
}
// retry 6 minutes at most
finished := util.WaitSomething(60, time.Second*6, func() bool {
for jobIdx := 0; jobIdx < workerCount; jobIdx++ {
for jobIdx := range workerCount {
err := cli.CheckFakeJobKey(ctx, jobID, jobIdx, expectedMvcc, updateValue)
if err != nil {
log.Warn("check fail job failed", zap.Error(err))
Expand Down
1 change: 0 additions & 1 deletion engine/chaos/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var cases = []caseFn{runFakeJobCase, runDMJobCases}
func runCases(ctx context.Context, cfg *config) error {
errg, ctx := errgroup.WithContext(ctx)
for _, fn := range cases {
fn := fn
errg.Go(func() error {
return fn(ctx, cfg)
})
Expand Down
8 changes: 4 additions & 4 deletions engine/chaos/cases/dm/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewCase(ctx context.Context, addr string, name string, cfgPath string) (*Ca
for range c.sources {
generators := make(map[string]sqlgen.SQLGenerator)
mcps := make(map[string]*mcp.ModificationCandidatePool)
for i := 0; i < tableNum; i++ {
for i := range tableNum {
tableName := fmt.Sprintf("tb%d", i)
tableConfig := &sqlconfig.TableConfig{
DatabaseName: c.name,
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Case) genFullData() error {
return err
}
sqls := make([]string, 0, rowNum)
for j := 0; j < rowNum; j++ {
for range rowNum {
sql, uk, err := generator.GenInsertRow()
if err != nil {
return err
Expand Down Expand Up @@ -244,7 +244,7 @@ func (c *Case) diffData(ctx context.Context) (bool, error) {
}

func (c *Case) diffDataLoop(ctx context.Context) error {
for i := 0; i < diffTimes; i++ {
for range diffTimes {
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -329,7 +329,7 @@ func (c *Case) genIncrData(ctx context.Context) error {
tableName := c.tables[rand.Intn(tableNum)]

sqls := make([]string, 0, batch)
for i := 0; i < batch; i++ {
for range batch {
sql, err := c.randDML(source, tableName, deleteKeys)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion engine/chaos/cases/dm/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (dc *dbConn) ExecuteSQLs(queries ...string) (int, error) {
}

ret, _, err := dc.con.ApplyRetryStrategy(tcontext.NewContext(ctx, log.L()), params,
func(tctx *tcontext.Context) (interface{}, error) {
func(tctx *tcontext.Context) (any, error) {
ret, err2 := dc.con.ExecuteSQLWithIgnoreError(tctx, nil, "chaos-cases", ignoreExecSQLError, queries)
return ret, err2
})
Expand Down
2 changes: 1 addition & 1 deletion engine/executor/cvs/cvstask.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *connPool) getConn(addr string) (*grpc.ClientConn, error) {
defer c.Unlock()
arr, ok := c.pool[addr]
if !ok {
for i := 0; i < 5; i++ {
for range 5 {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, err
Expand Down
24 changes: 9 additions & 15 deletions engine/executor/dm/unitholder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type unitHolder interface {
Pause(ctx context.Context) error
Resume(ctx context.Context) error
Stage() (metadata.TaskStage, *pb.ProcessResult)
Status(ctx context.Context) interface{}
Status(ctx context.Context) any
// CheckAndUpdateStatus checks if the last update of source status is outdated,
// if so, it will call Status.
// this should be an async func.
Expand Down Expand Up @@ -136,12 +136,10 @@ func (u *unitHolderImpl) Init(ctx context.Context) error {
u.fieldMu.Unlock()

resultCh := make(chan pb.ProcessResult, 1)
u.processWg.Add(1)
go func() {
defer u.processWg.Done()
u.processWg.Go(func() {
u.unit.Process(runCtx, resultCh)
u.fetchAndHandleResult(resultCh)
}()
})
return nil
}

Expand Down Expand Up @@ -183,12 +181,10 @@ func (u *unitHolderImpl) Resume(ctx context.Context) error {
u.fieldMu.Unlock()

resultCh := make(chan pb.ProcessResult, 1)
u.processWg.Add(1)
go func() {
defer u.processWg.Done()
u.processWg.Go(func() {
u.unit.Resume(runCtx, resultCh)
u.fetchAndHandleResult(resultCh)
}()
})
return nil
}

Expand Down Expand Up @@ -243,12 +239,12 @@ func (u *unitHolderImpl) Stage() (metadata.TaskStage, *pb.ProcessResult) {

// Status implement UnitHolder.Status. Each invocation will try to query upstream
// once and calculate the status.
func (u *unitHolderImpl) Status(ctx context.Context) interface{} {
func (u *unitHolderImpl) Status(ctx context.Context) any {
// nil sourceStatus is supported
return u.unit.Status(u.getSourceStatus())
}

func (u *unitHolderImpl) updateSourceStatus(ctx context.Context) interface{} {
func (u *unitHolderImpl) updateSourceStatus(ctx context.Context) any {
sourceStatus, err := binlog.GetSourceStatus(
tcontext.NewContext(ctx, u.logger),
u.upstreamDB,
Expand Down Expand Up @@ -279,13 +275,11 @@ func (u *unitHolderImpl) CheckAndUpdateStatus() {
defer u.fieldMu.Unlock()
if time.Since(u.sourceStatusCheckTime) > sourceStatusRefreshInterval {
u.sourceStatusCheckTime = time.Now()
u.bgWg.Add(1)
go func() {
defer u.bgWg.Done()
u.bgWg.Go(func() {
ctx, cancel := context.WithTimeout(context.Background(), sourceStatusCtxTimeOut)
u.updateSourceStatus(ctx)
cancel()
}()
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions engine/executor/dm/unitholder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (u *mockUnit) Update(ctx context.Context, cfg *config.SubTaskConfig) error
return nil
}

func (u *mockUnit) Status(sourceStatus *binlog.SourceStatus) interface{} {
func (u *mockUnit) Status(sourceStatus *binlog.SourceStatus) any {
u.Lock()
defer u.Unlock()
return u.Called().Get(0)
Expand Down Expand Up @@ -321,7 +321,7 @@ func (m *mockUnitHolder) Stage() (metadata.TaskStage, *pb.ProcessResult) {
}

// Status implement Holder.Status
func (m *mockUnitHolder) Status(ctx context.Context) interface{} {
func (m *mockUnitHolder) Status(ctx context.Context) any {
m.Lock()
defer m.Unlock()
args := m.Called()
Expand Down
6 changes: 2 additions & 4 deletions engine/executor/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,10 @@ func TestJobAPIServer(t *testing.T) {

wg := sync.WaitGroup{}
stoppedJobs := make(chan engineModel.JobID, 16)
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := jobAPISrv.listenStoppedJobs(context.Background(), stoppedJobs)
require.NoError(t, err)
}()
})
Comment on lines +79 to +82

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The standard library sync.WaitGroup does not have a Go method. Calling wg.Go(...) will cause a compilation error. Please revert this change to use the standard wg.Add(1) and go func() { defer wg.Done(); ... }() pattern.

Suggested change
wg.Go(func() {
err := jobAPISrv.listenStoppedJobs(context.Background(), stoppedJobs)
require.NoError(t, err)
}()
})
wg.Add(1)
go func() {
defer wg.Done()
err := jobAPISrv.listenStoppedJobs(context.Background(), stoppedJobs)
require.NoError(t, err)
}()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to make Gemini shut up about these (*WaitGroup).Go changes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Gemini Code Assist is going the way of the Dodo this isn't worth the effort.


stoppedJobs <- "job1"
require.Eventually(t, func() bool {
Expand Down
6 changes: 2 additions & 4 deletions engine/executor/worker/task_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ func newTaskCommitterWithClock(
requestTTL: requestTTL,
}

committer.wg.Add(1)
go func() {
defer committer.wg.Done()
committer.wg.Go(func() {
committer.runTTLChecker()
}()
})

return committer
}
Expand Down
2 changes: 1 addition & 1 deletion engine/executor/worker/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (r *TaskRunner) cancelAll() {
}
r.canceled = true

r.tasks.Range(func(key, value interface{}) bool {
r.tasks.Range(func(key, value any) bool {
id := key.(RunnableID)
t := value.(*taskEntry)
t.cancel()
Expand Down
22 changes: 8 additions & 14 deletions engine/executor/worker/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@ func TestTaskRunnerBasics(t *testing.T) {

tr := NewTaskRunner(workerNum+1, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := tr.Run(ctx)
require.Error(t, err)
require.Regexp(t, "context canceled", err.Error())
}()
})
Comment thread
dveeden marked this conversation as resolved.

var workers []*dummyWorker
for i := 0; i < workerNum; i++ {
for i := range workerNum {
worker := &dummyWorker{
id: fmt.Sprintf("worker-%d", i),
}
Expand Down Expand Up @@ -95,13 +93,11 @@ func TestTaskRunnerSubmitTime(t *testing.T) {
mockClock.Add(time.Hour)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := tr.Run(ctx)
require.Error(t, err)
require.Regexp(t, "context canceled", err.Error())
}()
})

require.Eventually(t, func() bool {
return worker.SubmitTime() == clock.ToMono(submitTime)
Expand All @@ -117,19 +113,17 @@ func TestTaskStopReceiver(t *testing.T) {

tr := NewTaskRunner(workerNum+1, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := tr.Run(ctx)
require.Error(t, err)
require.Regexp(t, "context canceled", err.Error())
}()
})

var (
workers []*dummyWorker
running sync.Map
)
for i := 0; i < workerNum; i++ {
for i := range workerNum {
worker := &dummyWorker{
id: fmt.Sprintf("worker-%d", i),
}
Expand Down
8 changes: 4 additions & 4 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type BaseJobMaster interface {
CurrentEpoch() frameModel.Epoch

// SendMessage sends a message of specific topic to jobmanager in a blocking or nonblocking way
SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error
SendMessage(ctx context.Context, topic p2p.Topic, message any, nonblocking bool) error

// Exit should be called when jobmaster (in user logic) wants to exit.
// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
Expand Down Expand Up @@ -344,7 +344,7 @@ func (d *DefaultBaseJobMaster) GetEnabledBucketStorage() (bool, resModel.Resourc
}

// SendMessage delegates the SendMessage or inner worker
func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error {
func (d *DefaultBaseJobMaster) SendMessage(ctx context.Context, topic p2p.Topic, message any, nonblocking bool) error {
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

Expand Down Expand Up @@ -394,7 +394,7 @@ func (j *jobMasterImplAsWorkerImpl) Tick(ctx context.Context) error {
}

func (j *jobMasterImplAsWorkerImpl) OnMasterMessage(
ctx context.Context, topic p2p.Topic, message interface{},
ctx context.Context, topic p2p.Topic, message any,
) error {
switch msg := message.(type) {
case *frameModel.StatusChangeRequest:
Expand Down Expand Up @@ -451,7 +451,7 @@ func (j *jobMasterImplAsMasterImpl) OnWorkerOffline(worker WorkerHandle, reason
return j.inner.OnWorkerOffline(worker, reason)
}

func (j *jobMasterImplAsMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error {
func (j *jobMasterImplAsMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message any) error {
return j.inner.OnWorkerMessage(worker, topic, message)
}

Expand Down
2 changes: 1 addition & 1 deletion engine/framework/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *testJobMasterImpl) OnWorkerOffline(worker WorkerHandle, reason error) e
return args.Error(0)
}

func (m *testJobMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message interface{}) error {
func (m *testJobMasterImpl) OnWorkerMessage(worker WorkerHandle, topic p2p.Topic, message any) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion engine/framework/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type (
WorkerType = frameModel.WorkerType

// WorkerConfig stores worker config in any type
WorkerConfig = interface{}
WorkerConfig = any

// WorkerHandle alias to master.WorkerHandle
WorkerHandle = master.WorkerHandle
Expand Down
20 changes: 6 additions & 14 deletions engine/framework/internal/eventloop/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,11 @@ func TestRunnerNormalPath(t *testing.T) {
task.On("Close", mock.Anything).Return(nil).Once()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
err := runner.Run(context.Background())
require.Error(t, err)
require.Regexp(t, "injected error", err)
}()
})
Comment thread
dveeden marked this conversation as resolved.

require.Eventually(t, func() bool {
return task.status.Load() == toyTaskRunning
Expand Down Expand Up @@ -153,14 +150,11 @@ func TestRunnerForcefulExit(t *testing.T) {
task.On("Close", mock.Anything).Return(nil).Once()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

wg.Go(func() {
err := runner.Run(context.Background())
require.Error(t, err)
require.Regexp(t, "ErrWorkerSuicide", err)
}()
})

require.Eventually(t, func() bool {
return task.status.Load() == toyTaskRunning
Expand Down Expand Up @@ -208,13 +202,11 @@ func TestRunnerStopByCancel(t *testing.T) {
task.On("Stop", mock.Anything).Return(nil).Once()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
wg.Go(func() {
err := runner.Run(context.Background())
require.Error(t, err)
require.Regexp(t, "worker is canceled", err)
}()
})

require.Eventually(t, func() bool {
return task.status.Load() == toyTaskRunning
Expand Down
2 changes: 1 addition & 1 deletion engine/framework/internal/master/mock_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type mockRunningHandle struct {
}

// SendMessage implements RunningHandle.SendMessage
func (rh mockRunningHandle) SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error {
func (rh mockRunningHandle) SendMessage(ctx context.Context, topic p2p.Topic, message any, nonblocking bool) error {
h := rh.handler
if h.IsTombstone {
return errors.ErrSendingMessageToTombstone.GenWithStackByCause(h.WorkerID)
Expand Down
Loading