From 1f762975b57151afaf231f4d452233b1a05c0e4b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 5 Jun 2026 13:35:02 +0800 Subject: [PATCH 1/5] fix ddl sink close leak, and remove verbose log --- cdc/kv/shared_client.go | 3 --- cdc/owner/ddl_sink.go | 7 ++++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 59d6cea212..7b7d8e5544 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -713,9 +713,6 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { return ctx.Err() case <-ticker.C: } - log.Info("event feed starts to check locked regions", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID)) currTime := s.pdClock.CurrentTime() s.totalSpans.RLock() diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index b5347aae9c..1c36aaf8ca 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -174,7 +174,12 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f zap.Bool("retryable", isRetryable), zap.Error(err)) - s.sink = nil + if s.sink != nil { + s.sink.Close() + s.sink = nil + log.Info("close the ddl sink, rebuild it when trying again", + zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) + } if isRetryable { s.reportWarning(err) } else { From 35fe83511356aa76a70509450d4498add34557eb Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 2 Jul 2026 22:32:49 +0800 Subject: [PATCH 2/5] bump github action version --- .github/workflows/check_and_build.yaml | 28 +++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/.github/workflows/check_and_build.yaml b/.github/workflows/check_and_build.yaml index ff474d740c..547bd14480 100644 --- a/.github/workflows/check_and_build.yaml +++ b/.github/workflows/check_and_build.yaml @@ -6,20 +6,20 @@ on: - master - "release-[0-9].[0-9]*" paths-ignore: - - '**/*.md' - - '**/OWNERS' - - 'OWNERS' - - 'OWNERS_ALIASES' + - "**/*.md" + - "**/OWNERS" + - "OWNERS" + - "OWNERS_ALIASES" pull_request: branches: - master - "release-[0-9].[0-9]*" paths-ignore: - - '**/*.md' - - '**/OWNERS' - - 'OWNERS' - - 'OWNERS_ALIASES' + - "**/*.md" + - "**/OWNERS" + - "OWNERS" + - "OWNERS_ALIASES" # See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency. concurrency: @@ -32,7 +32,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code into the Go module directory - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Build-cdc run: docker build -f ./deployments/ticdc/docker/Dockerfile . @@ -48,16 +48,16 @@ jobs: runs-on: macos-latest steps: - name: Check out code into the Go module directory - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Setup Go environment uses: actions/setup-go@v3 with: - go-version: '1.21' + go-version: "1.21" - name: Cache Tools id: cache-tools - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: tools/bin key: macos-latest-ticdc-tools-${{ hashFiles('tools/check/go.sum') }} @@ -74,12 +74,12 @@ jobs: arch: [ARM64] steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Setup Go environment uses: actions/setup-go@v3 with: - go-version: '1.21' + go-version: "1.21" - name: Build run: make build From 68c67d28bbe642ae207b3d3d0116ea2c93abf182 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 2 Jul 2026 23:14:52 +0800 Subject: [PATCH 3/5] bump github action version --- .github/workflows/check_and_build.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/check_and_build.yaml b/.github/workflows/check_and_build.yaml index 547bd14480..5a59e61372 100644 --- a/.github/workflows/check_and_build.yaml +++ b/.github/workflows/check_and_build.yaml @@ -32,7 +32,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code into the Go module directory - uses: actions/checkout@v4 + uses: actions/checkout@v2 - name: Build-cdc run: docker build -f ./deployments/ticdc/docker/Dockerfile . @@ -48,7 +48,7 @@ jobs: runs-on: macos-latest steps: - name: Check out code into the Go module directory - uses: actions/checkout@v4 + uses: actions/checkout@v2 - name: Setup Go environment uses: actions/setup-go@v3 @@ -74,7 +74,7 @@ jobs: arch: [ARM64] steps: - name: Check out code - uses: actions/checkout@v4 + uses: actions/checkout@v2 - name: Setup Go environment uses: actions/setup-go@v3 From c547916af329df350e6fe217aff05183312f1cf5 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 2 Jul 2026 23:40:12 +0800 Subject: [PATCH 4/5] fix potential data race on the ddl sink --- cdc/owner/ddl_sink.go | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 1c36aaf8ca..be9b867eb0 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -78,7 +78,8 @@ type ddlSinkImpl struct { ddlCh chan *model.DDLEvent - sink ddlsink.Sink + sinkMu sync.Mutex + sink ddlsink.Sink // `sinkInitHandler` can be helpful in unit testing. sinkInitHandler ddlSinkInitHandler @@ -147,7 +148,10 @@ func (s *ddlSinkImpl) makeSyncPointStoreReady(ctx context.Context) error { return nil } -func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error { +func (s *ddlSinkImpl) withReadySink(ctx context.Context, action func(ddlsink.Sink) error) error { + s.sinkMu.Lock() + defer s.sinkMu.Unlock() + if s.sink == nil { if err := s.sinkInitHandler(ctx, s); err != nil { log.Warn("ddl sink initialize failed", @@ -157,7 +161,7 @@ func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error { return errors.New("ddlSink not ready") } } - return nil + return action(s.sink) } // retry the given action with 5s interval. Before every retry, s.sink will be re-initialized. @@ -174,9 +178,14 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f zap.Bool("retryable", isRetryable), zap.Error(err)) - if s.sink != nil { + s.sinkMu.Lock() + closed := s.sink != nil + if closed { s.sink.Close() s.sink = nil + } + s.sinkMu.Unlock() + if closed { log.Info("close the ddl sink, rebuild it when trying again", zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) } @@ -228,9 +237,9 @@ func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *m tables = append(tables, s.mu.currentTables...) s.mu.Unlock() - if err = s.makeSinkReady(ctx); err == nil { - err = s.sink.WriteCheckpointTs(ctx, checkpointTs, tables) - } + err = s.withReadySink(ctx, func(sink ddlsink.Sink) error { + return sink.WriteCheckpointTs(ctx, checkpointTs, tables) + }) if err == nil { *lastCheckpointTs = checkpointTs } @@ -247,12 +256,13 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er zap.Any("DDL", ddl)) doWrite := func() (err error) { - if err = s.makeSinkReady(ctx); err == nil { - err = s.sink.WriteDDLEvent(ctx, ddl) + err = s.withReadySink(ctx, func(sink ddlsink.Sink) error { + err := sink.WriteDDLEvent(ctx, ddl) failpoint.Inject("InjectChangefeedDDLError", func() { err = cerror.ErrExecDDLFailed.GenWithStackByArgs() }) - } + return err + }) if err != nil { log.Error("Execute DDL failed", zap.String("namespace", s.changefeedID.Namespace), @@ -417,9 +427,12 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { s.wg.Wait() // they will both be nil if changefeed return an error in initializing + s.sinkMu.Lock() if s.sink != nil { s.sink.Close() + s.sink = nil } + s.sinkMu.Unlock() if s.syncPointStore != nil { err = s.syncPointStore.Close() } @@ -482,8 +495,7 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { } func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error { - if err := s.makeSinkReady(ctx); err != nil { - return errors.Trace(err) - } - return s.sink.WriteDDLEvent(ctx, bootstrap) + return s.withReadySink(ctx, func(sink ddlsink.Sink) error { + return sink.WriteDDLEvent(ctx, bootstrap) + }) } From 77910f111ba3f1134abe035cd0c0458ec6054f5b Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 3 Jul 2026 10:51:11 +0800 Subject: [PATCH 5/5] remove arm build from github action --- .github/workflows/check_and_build.yaml | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/.github/workflows/check_and_build.yaml b/.github/workflows/check_and_build.yaml index 5a59e61372..b497075cea 100644 --- a/.github/workflows/check_and_build.yaml +++ b/.github/workflows/check_and_build.yaml @@ -64,22 +64,3 @@ jobs: - name: Build run: make build - - arm_build: - runs-on: [ARM64] - name: Arm Build - strategy: - fail-fast: false - matrix: - arch: [ARM64] - steps: - - name: Check out code - uses: actions/checkout@v2 - - - name: Setup Go environment - uses: actions/setup-go@v3 - with: - go-version: "1.21" - - - name: Build - run: make build