diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 59d6cea212..54c709b423 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -170,6 +170,14 @@ func (s *SharedClient) AllocSubscriptionID() SubscriptionID { return SubscriptionID(subscriptionIDGen.Add(1)) } +// GetTS returns the current PD TSO. +func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) { + if s.pd == nil { + return 0, 0, errors.New("pd client is nil") + } + return s.pd.GetTS(ctx) +} + // Subscribe the given table span. // NOTE: `span.TableID` must be set correctly. func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent) { diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index d54311393f..c771a17ae9 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -66,7 +66,72 @@ type tableProgress struct { scheduled atomic.Bool } +<<<<<<< HEAD type tableProgressWithSubID struct { +======= +func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.ResolvedSpans) (err error) { + for _, resolvedSpan := range e.Spans { + if !spanz.IsSubSpan(resolvedSpan.Span, p.spans...) { + log.Panic("the resolved span is not in the table spans", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("tableName", p.tableName), + zap.Any("spans", p.spans)) + } + p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.ResolvedTs) + if e.ResolvedTs > p.maxIngressResolvedTs.Load() { + p.maxIngressResolvedTs.Store(e.ResolvedTs) + } + } + resolvedTs := p.tsTracker.Frontier() + + if resolvedTs > 0 && p.initialized.CompareAndSwap(false, true) { + log.Info("puller is initialized", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("tableName", p.tableName), + zap.Any("tableID", p.spans), + zap.Uint64("resolvedTs", resolvedTs), + zap.Duration("duration", time.Since(p.start)), + ) + } + if resolvedTs > p.resolvedTs.Load() { + p.resolvedTs.Store(resolvedTs) + p.resolvedTsUpdated.Store(time.Now().Unix()) + raw := &model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved} + err = p.consume.f(ctx, raw, p.spans) + } + + return +} + +func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 { + resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0) + if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence { + return 0 + } + resolvedTs := p.resolvedTs.Load() + resolvedTime := oracle.GetTimeFromTS(resolvedTs) + if currentTime.Sub(resolvedTime) < resolveLockFence { + return 0 + } + + return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))) +} + +func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) { + targetTs := p.getResolveLockTargetTs(currentTime, currentTs) + if targetTs == 0 { + return + } + + for _, subID := range p.subscriptionIDs { + p.client.ResolveLock(subID, targetTs) + } +} + +type subscription struct { +>>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741)) *tableProgress subID kv.SubscriptionID } @@ -396,13 +461,23 @@ func (p *MultiplexingPuller) checkResolveLock(ctx context.Context) error { return ctx.Err() case <-resolveLockTicker.C: } +<<<<<<< HEAD currentTime := p.client.GetPDClock().CurrentTime() +======= + physical, logical, err := p.client.GetTS(ctx) + if err != nil { + log.Warn("get ts from pd failed", zap.Error(err)) + continue + } + currentTs := oracle.ComposeTS(physical, logical) + currentTime := p.pdClock.CurrentTime() +>>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741)) for progress := range p.getAllProgresses() { select { case <-ctx.Done(): return ctx.Err() default: - progress.resolveLock(currentTime) + progress.resolveLock(currentTime, currentTs) } } } diff --git a/cdc/puller/multiplexing_puller_test.go b/cdc/puller/multiplexing_puller_test.go index 30f7fd1ae6..5765eb8122 100644 --- a/cdc/puller/multiplexing_puller_test.go +++ b/cdc/puller/multiplexing_puller_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller { @@ -99,3 +100,38 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) { cancel() wg.Wait() } + +func TestGetResolveLockTargetTs(t *testing.T) { + progress := &tableProgress{} + progress.initialized.Store(true) + + pdNow := time.Now() + localClockNow := pdNow.Add(30 * time.Second) + currentTs := oracle.GoTimeToTS(pdNow) + + resolvedTime := pdNow.Add(-2 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime)) + progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix()) + + tsIfUncapped := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + require.Greater(t, tsIfUncapped, currentTs) + require.Equal(t, currentTs, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + resolvedTime = pdNow.Add(-20 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime)) + tsIfUncapped = oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + require.Less(t, tsIfUncapped, currentTs) + require.Equal(t, tsIfUncapped, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.initialized.Store(false) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.initialized.Store(true) + progress.resolvedTsUpdated.Store(time.Now().Unix()) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix()) + recentResolvedTime := localClockNow.Add(-2 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(recentResolvedTime)) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) +} diff --git a/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl b/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl new file mode 100644 index 0000000000..7e63630e7c --- /dev/null +++ b/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl @@ -0,0 +1,10 @@ +CREATE TABLE `roles` ( +`id` varchar(32) NOT NULL, +`name` varchar(100) NOT NULL, +`context` varchar(20) NOT NULL, +`organization_id` int(11) DEFAULT NULL, +`client_id` varchar(32) NOT NULL, +`scope_action_ids` text NOT NULL, +PRIMARY KEY (`id`) +-- FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) is disabled because TiDB classic rejects starter-only FULLTEXT indexes. +) ENGINE=InnoDB DEFAULT CHARSET=utf8;