Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ func (s *SharedClient) AllocSubscriptionID() SubscriptionID {
return SubscriptionID(subscriptionIDGen.Add(1))
}

// GetTS returns the current PD TSO.
func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) {
if s.pd == nil {
return 0, 0, errors.New("pd client is nil")
}
return s.pd.GetTS(ctx)
}
Comment on lines +280 to +285

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In unit tests (such as TestMultiplexingPullerResolvedForward), s.pd is often nil. Returning an error in this case causes runResolveLockChecker to log a warning (get ts from pd failed) every 2 seconds, which spams the test logs.

Since currentTs being 0 naturally disables lock resolution (as targetTs becomes 0), returning 0, 0, nil when s.pd is nil is a clean way to silence these warnings in tests without affecting production (where s.pd is always initialized).

Suggested change
func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) {
if s.pd == nil {
return 0, 0, errors.New("pd client is nil")
}
return s.pd.GetTS(ctx)
}
func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) {
if s.pd == nil {
return 0, 0, nil
}
return s.pd.GetTS(ctx)
}


// Subscribe the given table span.
// NOTE: `span.TableID` must be set correctly.
// It new a subscribedTable and store it in `s.totalSpans`,
Expand Down
22 changes: 18 additions & 4 deletions cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,26 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv
return
}

func (p *tableProgress) resolveLock(currentTime time.Time) {
func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 {
resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0)
if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence {
return
return 0
}
resolvedTs := p.resolvedTs.Load()
resolvedTime := oracle.GetTimeFromTS(resolvedTs)
if currentTime.Sub(resolvedTime) < resolveLockFence {
return 0
}

return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)))
}

func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) {
targetTs := p.getResolveLockTargetTs(currentTime, currentTs)
if targetTs == 0 {
return
}

targetTs := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
for _, subID := range p.subscriptionIDs {
p.client.ResolveLock(subID, targetTs)
}
Expand Down Expand Up @@ -488,13 +496,19 @@ func (p *MultiplexingPuller) runResolveLockChecker(ctx context.Context) error {
return ctx.Err()
case <-resolveLockTicker.C:
}
physical, logical, err := p.client.GetTS(ctx)
if err != nil {
log.Warn("get ts from pd failed", zap.Error(err))
continue
}
currentTs := oracle.ComposeTS(physical, logical)
currentTime := p.pdClock.CurrentTime()
for progress := range p.getAllProgresses() {
select {
case <-ctx.Done():
return ctx.Err()
default:
progress.resolveLock(currentTime)
progress.resolveLock(currentTime, currentTs)
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions cdc/puller/multiplexing_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller {
Expand Down Expand Up @@ -106,3 +107,38 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
cancel()
wg.Wait()
}

func TestGetResolveLockTargetTs(t *testing.T) {
progress := &tableProgress{}
progress.initialized.Store(true)

pdNow := time.Now()
localClockNow := pdNow.Add(30 * time.Second)
currentTs := oracle.GoTimeToTS(pdNow)

resolvedTime := pdNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())

tsIfUncapped := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Greater(t, tsIfUncapped, currentTs)
require.Equal(t, currentTs, progress.getResolveLockTargetTs(localClockNow, currentTs))

resolvedTime = pdNow.Add(-20 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
tsIfUncapped = oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Less(t, tsIfUncapped, currentTs)
require.Equal(t, tsIfUncapped, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(false)
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(true)
progress.resolvedTsUpdated.Store(time.Now().Unix())
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())
recentResolvedTime := localClockNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(recentResolvedTime))
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))
}
10 changes: 10 additions & 0 deletions tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE `roles` (
`id` varchar(32) NOT NULL,
`name` varchar(100) NOT NULL,
`context` varchar(20) NOT NULL,
`organization_id` int(11) DEFAULT NULL,
`client_id` varchar(32) NOT NULL,
`scope_action_ids` text NOT NULL,
PRIMARY KEY (`id`)
-- FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) is disabled because TiDB classic rejects starter-only FULLTEXT indexes.
) ENGINE=InnoDB DEFAULT CHARSET=utf8;