Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 10 additions & 29 deletions .github/workflows/check_and_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ on:
- master
- "release-[0-9].[0-9]*"
paths-ignore:
- '**/*.md'
- '**/OWNERS'
- 'OWNERS'
- 'OWNERS_ALIASES'
- "**/*.md"
- "**/OWNERS"
- "OWNERS"
- "OWNERS_ALIASES"

pull_request:
branches:
- master
- "release-[0-9].[0-9]*"
paths-ignore:
- '**/*.md'
- '**/OWNERS'
- 'OWNERS'
- 'OWNERS_ALIASES'
- "**/*.md"
- "**/OWNERS"
- "OWNERS"
- "OWNERS_ALIASES"

# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency.
concurrency:
Expand Down Expand Up @@ -53,33 +53,14 @@ jobs:
- name: Setup Go environment
uses: actions/setup-go@v3
with:
go-version: '1.21'
go-version: "1.21"

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
uses: actions/cache@v4
with:
path: tools/bin
key: macos-latest-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}

- name: Build
run: make build

arm_build:
runs-on: [ARM64]
name: Arm Build
strategy:
fail-fast: false
matrix:
arch: [ARM64]
steps:
- name: Check out code
uses: actions/checkout@v2

- name: Setup Go environment
uses: actions/setup-go@v3
with:
go-version: '1.21'

- name: Build
run: make build
3 changes: 0 additions & 3 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,9 +713,6 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error {
return ctx.Err()
case <-ticker.C:
}
log.Info("event feed starts to check locked regions",
zap.String("namespace", s.changefeed.Namespace),
zap.String("changefeed", s.changefeed.ID))

currTime := s.pdClock.CurrentTime()
s.totalSpans.RLock()
Expand Down
45 changes: 31 additions & 14 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ type ddlSinkImpl struct {

ddlCh chan *model.DDLEvent

sink ddlsink.Sink
sinkMu sync.Mutex
sink ddlsink.Sink
// `sinkInitHandler` can be helpful in unit testing.
sinkInitHandler ddlSinkInitHandler

Expand Down Expand Up @@ -147,7 +148,10 @@ func (s *ddlSinkImpl) makeSyncPointStoreReady(ctx context.Context) error {
return nil
}

func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error {
func (s *ddlSinkImpl) withReadySink(ctx context.Context, action func(ddlsink.Sink) error) error {
s.sinkMu.Lock()
defer s.sinkMu.Unlock()

if s.sink == nil {
if err := s.sinkInitHandler(ctx, s); err != nil {
log.Warn("ddl sink initialize failed",
Expand All @@ -157,7 +161,7 @@ func (s *ddlSinkImpl) makeSinkReady(ctx context.Context) error {
return errors.New("ddlSink not ready")
}
}
return nil
return action(s.sink)
}

// retry the given action with 5s interval. Before every retry, s.sink will be re-initialized.
Expand All @@ -174,7 +178,17 @@ func (s *ddlSinkImpl) retrySinkAction(ctx context.Context, name string, action f
zap.Bool("retryable", isRetryable),
zap.Error(err))

s.sink = nil
s.sinkMu.Lock()
closed := s.sink != nil
if closed {
s.sink.Close()
s.sink = nil
}
s.sinkMu.Unlock()
if closed {
log.Info("close the ddl sink, rebuild it when trying again",
zap.String("namespace", s.changefeedID.Namespace), zap.String("changefeed", s.changefeedID.ID))
}
if isRetryable {
s.reportWarning(err)
} else {
Expand Down Expand Up @@ -223,9 +237,9 @@ func (s *ddlSinkImpl) writeCheckpointTs(ctx context.Context, lastCheckpointTs *m
tables = append(tables, s.mu.currentTables...)
s.mu.Unlock()

if err = s.makeSinkReady(ctx); err == nil {
err = s.sink.WriteCheckpointTs(ctx, checkpointTs, tables)
}
err = s.withReadySink(ctx, func(sink ddlsink.Sink) error {
return sink.WriteCheckpointTs(ctx, checkpointTs, tables)
})
if err == nil {
*lastCheckpointTs = checkpointTs
}
Expand All @@ -242,12 +256,13 @@ func (s *ddlSinkImpl) writeDDLEvent(ctx context.Context, ddl *model.DDLEvent) er
zap.Any("DDL", ddl))

doWrite := func() (err error) {
if err = s.makeSinkReady(ctx); err == nil {
err = s.sink.WriteDDLEvent(ctx, ddl)
err = s.withReadySink(ctx, func(sink ddlsink.Sink) error {
err := sink.WriteDDLEvent(ctx, ddl)
failpoint.Inject("InjectChangefeedDDLError", func() {
err = cerror.ErrExecDDLFailed.GenWithStackByArgs()
})
}
return err
})
if err != nil {
log.Error("Execute DDL failed",
zap.String("namespace", s.changefeedID.Namespace),
Expand Down Expand Up @@ -412,9 +427,12 @@ func (s *ddlSinkImpl) close(ctx context.Context) (err error) {
s.wg.Wait()

// they will both be nil if changefeed return an error in initializing
s.sinkMu.Lock()
if s.sink != nil {
s.sink.Close()
s.sink = nil
}
s.sinkMu.Unlock()
if s.syncPointStore != nil {
err = s.syncPointStore.Close()
}
Expand Down Expand Up @@ -477,8 +495,7 @@ func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
}

func (s *ddlSinkImpl) emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error {
if err := s.makeSinkReady(ctx); err != nil {
return errors.Trace(err)
}
return s.sink.WriteDDLEvent(ctx, bootstrap)
return s.withReadySink(ctx, func(sink ddlsink.Sink) error {
return sink.WriteDDLEvent(ctx, bootstrap)
})
}
Loading