diff --git a/.github/workflows/check_and_build.yaml b/.github/workflows/check_and_build.yaml index ff474d740c..b497075cea 100644 --- a/.github/workflows/check_and_build.yaml +++ b/.github/workflows/check_and_build.yaml @@ -6,20 +6,20 @@ on: - master - "release-[0-9].[0-9]*" paths-ignore: - - '**/*.md' - - '**/OWNERS' - - 'OWNERS' - - 'OWNERS_ALIASES' + - "**/*.md" + - "**/OWNERS" + - "OWNERS" + - "OWNERS_ALIASES" pull_request: branches: - master - "release-[0-9].[0-9]*" paths-ignore: - - '**/*.md' - - '**/OWNERS' - - 'OWNERS' - - 'OWNERS_ALIASES' + - "**/*.md" + - "**/OWNERS" + - "OWNERS" + - "OWNERS_ALIASES" # See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency. concurrency: @@ -53,33 +53,14 @@ jobs: - name: Setup Go environment uses: actions/setup-go@v3 with: - go-version: '1.21' + go-version: "1.21" - name: Cache Tools id: cache-tools - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: tools/bin key: macos-latest-ticdc-tools-${{ hashFiles('tools/check/go.sum') }} - name: Build run: make build - - arm_build: - runs-on: [ARM64] - name: Arm Build - strategy: - fail-fast: false - matrix: - arch: [ARM64] - steps: - - name: Check out code - uses: actions/checkout@v2 - - - name: Setup Go environment - uses: actions/setup-go@v3 - with: - go-version: '1.21' - - - name: Build - run: make build diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 59d6cea212..7b7d8e5544 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -713,9 +713,6 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { return ctx.Err() case <-ticker.C: } - log.Info("event feed starts to check locked regions", - zap.String("namespace", s.changefeed.Namespace), - zap.String("changefeed", s.changefeed.ID)) currTime := s.pdClock.CurrentTime() s.totalSpans.RLock() diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index b5347aae9c..be9b867eb0 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -78,7 +78,8 @@ type ddlSinkImpl struct { ddlCh chan *model.DDLEvent - sink ddlsink.Sink + sinkMu sync.Mutex + sink ddlsink.Sink // `sinkInitHandler` can be helpful in unit testing. sinkInitHandler ddlSinkInitHandler @@ -147,7 +148,10 @@ func (s *ddlSinkImpl) makeSyncPointStoreReady(ctx context.Context) error { return nil } -func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error { +func (s *ddlSinkImpl) withReadySink(ctx context.Context, action func(ddlsink.Sink) error) error { + s.sinkMu.Lock() + defer s.sinkMu.Unlock() + if s.sink == nil { if err := s.sinkInitHandler(ctx, s); err != nil { log.Warn("ddl sink initialize failed", @@ -157,7 +161,7 @@ func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error { return errors.New("ddlSink not ready") } } - return nil + return action(s.sink) } // retry the given action with 5s interval. Before every retry, s.sink will be re-initialized. @@ -174,7 +178,17 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f zap.Bool("retryable", isRetryable), zap.Error(err)) - s.sink = nil + s.sinkMu.Lock() + closed := s.sink != nil + if closed { + s.sink.Close() + s.sink = nil + } + s.sinkMu.Unlock() + if closed { + log.Info("close the ddl sink, rebuild it when trying again", + zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID)) + } if isRetryable { s.reportWarning(err) } else { @@ -223,9 +237,9 @@ func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *m tables = append(tables, s.mu.currentTables...) s.mu.Unlock() - if err = s.makeSinkReady(ctx); err == nil { - err = s.sink.WriteCheckpointTs(ctx, checkpointTs, tables) - } + err = s.withReadySink(ctx, func(sink ddlsink.Sink) error { + return sink.WriteCheckpointTs(ctx, checkpointTs, tables) + }) if err == nil { *lastCheckpointTs = checkpointTs } @@ -242,12 +256,13 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er zap.Any("DDL", ddl)) doWrite := func() (err error) { - if err = s.makeSinkReady(ctx); err == nil { - err = s.sink.WriteDDLEvent(ctx, ddl) + err = s.withReadySink(ctx, func(sink ddlsink.Sink) error { + err := sink.WriteDDLEvent(ctx, ddl) failpoint.Inject("InjectChangefeedDDLError", func() { err = cerror.ErrExecDDLFailed.GenWithStackByArgs() }) - } + return err + }) if err != nil { log.Error("Execute DDL failed", zap.String("namespace", s.changefeedID.Namespace), @@ -412,9 +427,12 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) { s.wg.Wait() // they will both be nil if changefeed return an error in initializing + s.sinkMu.Lock() if s.sink != nil { s.sink.Close() + s.sink = nil } + s.sinkMu.Unlock() if s.syncPointStore != nil { err = s.syncPointStore.Close() } @@ -477,8 +495,7 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) { } func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error { - if err := s.makeSinkReady(ctx); err != nil { - return errors.Trace(err) - } - return s.sink.WriteDDLEvent(ctx, bootstrap) + return s.withReadySink(ctx, func(sink ddlsink.Sink) error { + return sink.WriteDDLEvent(ctx, bootstrap) + }) }