diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index c81c9f40d0..8c775b6383 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -276,6 +276,14 @@ func (s *SharedClient) AllocSubscriptionID() SubscriptionID { return SubscriptionID(subscriptionIDGen.Add(1)) } +// GetTS returns the current PD TSO. +func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) { + if s.pd == nil { + return 0, 0, errors.New("pd client is nil") + } + return s.pd.GetTS(ctx) +} + // Subscribe the given table span. // NOTE: `span.TableID` must be set correctly. // It new a subscribedTable and store it in `s.totalSpans`, diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index c6ee56d074..278a7e447c 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -111,18 +111,26 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv return } -func (p *tableProgress) resolveLock(currentTime time.Time) { +func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 { resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0) if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence { - return + return 0 } resolvedTs := p.resolvedTs.Load() resolvedTime := oracle.GetTimeFromTS(resolvedTs) if currentTime.Sub(resolvedTime) < resolveLockFence { + return 0 + } + + return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))) +} + +func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) { + targetTs := p.getResolveLockTargetTs(currentTime, currentTs) + if targetTs == 0 { return } - targetTs := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) for _, subID := range p.subscriptionIDs { p.client.ResolveLock(subID, targetTs) } @@ -488,13 +496,19 @@ func (p *MultiplexingPuller) runResolveLockChecker(ctx context.Context) error { return ctx.Err() case <-resolveLockTicker.C: } + physical, logical, err := p.client.GetTS(ctx) + if err != nil { + log.Warn("get ts from pd failed", zap.Error(err)) + continue + } + currentTs := oracle.ComposeTS(physical, logical) currentTime := p.pdClock.CurrentTime() for progress := range p.getAllProgresses() { select { case <-ctx.Done(): return ctx.Err() default: - progress.resolveLock(currentTime) + progress.resolveLock(currentTime, currentTs) } } } diff --git a/cdc/puller/multiplexing_puller_test.go b/cdc/puller/multiplexing_puller_test.go index 6563c0f350..fe28e3216d 100644 --- a/cdc/puller/multiplexing_puller_test.go +++ b/cdc/puller/multiplexing_puller_test.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" ) func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller { @@ -106,3 +107,38 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) { cancel() wg.Wait() } + +func TestGetResolveLockTargetTs(t *testing.T) { + progress := &tableProgress{} + progress.initialized.Store(true) + + pdNow := time.Now() + localClockNow := pdNow.Add(30 * time.Second) + currentTs := oracle.GoTimeToTS(pdNow) + + resolvedTime := pdNow.Add(-2 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime)) + progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix()) + + tsIfUncapped := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + require.Greater(t, tsIfUncapped, currentTs) + require.Equal(t, currentTs, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + resolvedTime = pdNow.Add(-20 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime)) + tsIfUncapped = oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + require.Less(t, tsIfUncapped, currentTs) + require.Equal(t, tsIfUncapped, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.initialized.Store(false) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.initialized.Store(true) + progress.resolvedTsUpdated.Store(time.Now().Unix()) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) + + progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix()) + recentResolvedTime := localClockNow.Add(-2 * time.Second) + progress.resolvedTs.Store(oracle.GoTimeToTS(recentResolvedTime)) + require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) +} diff --git a/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl b/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl index 0d3026a4c3..7e63630e7c 100644 --- a/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl +++ b/tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl @@ -5,6 +5,6 @@ CREATE TABLE `roles` ( `organization_id` int(11) DEFAULT NULL, `client_id` varchar(32) NOT NULL, `scope_action_ids` text NOT NULL, -PRIMARY KEY (`id`), -FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) +PRIMARY KEY (`id`) +-- FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) is disabled because TiDB classic rejects starter-only FULLTEXT indexes. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;