Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,14 @@ func (s *SharedClient) AllocSubscriptionID() SubscriptionID {
return SubscriptionID(subscriptionIDGen.Add(1))
}

// GetTS returns the current PD TSO.
func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) {
if s.pd == nil {
return 0, 0, errors.New("pd client is nil")
}
return s.pd.GetTS(ctx)
}

// Subscribe the given table span.
// NOTE: `span.TableID` must be set correctly.
func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startTs uint64, eventCh chan<- MultiplexingEvent) {
Expand Down
77 changes: 76 additions & 1 deletion cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,72 @@ type tableProgress struct {
scheduled atomic.Bool
}

<<<<<<< HEAD
type tableProgressWithSubID struct {
=======
func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.ResolvedSpans) (err error) {
for _, resolvedSpan := range e.Spans {
if !spanz.IsSubSpan(resolvedSpan.Span, p.spans...) {
log.Panic("the resolved span is not in the table spans",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.String("tableName", p.tableName),
zap.Any("spans", p.spans))
}
p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.ResolvedTs)
if e.ResolvedTs > p.maxIngressResolvedTs.Load() {
p.maxIngressResolvedTs.Store(e.ResolvedTs)
}
}
resolvedTs := p.tsTracker.Frontier()

if resolvedTs > 0 && p.initialized.CompareAndSwap(false, true) {
log.Info("puller is initialized",
zap.String("namespace", p.changefeed.Namespace),
zap.String("changefeed", p.changefeed.ID),
zap.String("tableName", p.tableName),
zap.Any("tableID", p.spans),
zap.Uint64("resolvedTs", resolvedTs),
zap.Duration("duration", time.Since(p.start)),
)
}
if resolvedTs > p.resolvedTs.Load() {
p.resolvedTs.Store(resolvedTs)
p.resolvedTsUpdated.Store(time.Now().Unix())
raw := &model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved}
err = p.consume.f(ctx, raw, p.spans)
}

return
}

func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 {
resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0)
if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence {
return 0
}
resolvedTs := p.resolvedTs.Load()
resolvedTime := oracle.GetTimeFromTS(resolvedTs)
if currentTime.Sub(resolvedTime) < resolveLockFence {
return 0
}

return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)))
}

func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) {
targetTs := p.getResolveLockTargetTs(currentTime, currentTs)
if targetTs == 0 {
return
}

for _, subID := range p.subscriptionIDs {
p.client.ResolveLock(subID, targetTs)
}
}

type subscription struct {
>>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741))
Comment on lines +69 to +134

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers in this file, and the cherry-pick has introduced duplicate/broken definitions of handleResolvedSpans and resolveLock.

Additionally, there is a correctness/testability bug in getResolveLockTargetTs: it uses time.Since(resolvedTsUpdated) which depends on the real system clock, instead of using the passed-in currentTime parameter (i.e., currentTime.Sub(resolvedTsUpdated)). This makes the function non-deterministic and can cause test flakiness or incorrect behavior under clock skew.

Please resolve the conflict markers and apply the following fixes:

  1. Keep tableProgressWithSubID and define getResolveLockTargetTs using currentTime.Sub(resolvedTsUpdated).
  2. Define the updated resolveLock taking currentTs and using p.subIDs (instead of the undefined p.subscriptionIDs).
  3. Note: Since Go does not support method overloading, you must also delete the old, duplicate definitions of handleResolvedSpans (at line 486) and resolveLock (at line 519) that were left outside of the conflict markers during the cherry-pick.
type tableProgressWithSubID struct {
	*tableProgress
	subID kv.SubscriptionID
}

func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 {
	resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0)
	if !p.initialized.Load() || currentTime.Sub(resolvedTsUpdated) < resolveLockFence {
		return 0
	}
	resolvedTs := p.resolvedTs.Load()
	resolvedTime := oracle.GetTimeFromTS(resolvedTs)
	if currentTime.Sub(resolvedTime) < resolveLockFence {
		return 0
	}

	return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)))
}

func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) {
	targetTs := p.getResolveLockTargetTs(currentTime, currentTs)
	if targetTs == 0 {
		return
	}

	for _, subID := range p.subIDs {
		p.client.ResolveLock(subID, targetTs)
	}
}

*tableProgress
subID kv.SubscriptionID
}
Expand Down Expand Up @@ -396,13 +461,23 @@ func (p *MultiplexingPuller) checkResolveLock(ctx context.Context) error {
return ctx.Err()
case <-resolveLockTicker.C:
}
<<<<<<< HEAD
currentTime := p.client.GetPDClock().CurrentTime()
=======
physical, logical, err := p.client.GetTS(ctx)
if err != nil {
log.Warn("get ts from pd failed", zap.Error(err))
continue
}
currentTs := oracle.ComposeTS(physical, logical)
currentTime := p.pdClock.CurrentTime()
>>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741))
Comment on lines +464 to +474

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are unresolved git conflict markers here. Additionally, p.pdClock is used, but MultiplexingPuller does not have a pdClock field. It should use p.client.GetPDClock() instead.

		physical, logical, err := p.client.GetTS(ctx)
		if err != nil {
			log.Warn("get ts from pd failed", zap.Error(err))
			continue
		}
		currentTs := oracle.ComposeTS(physical, logical)
		currentTime := p.client.GetPDClock().CurrentTime()

for progress := range p.getAllProgresses() {
select {
case <-ctx.Done():
return ctx.Err()
default:
progress.resolveLock(currentTime)
progress.resolveLock(currentTime, currentTs)
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions cdc/puller/multiplexing_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller {
Expand Down Expand Up @@ -99,3 +100,38 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
cancel()
wg.Wait()
}

func TestGetResolveLockTargetTs(t *testing.T) {
progress := &tableProgress{}
progress.initialized.Store(true)

pdNow := time.Now()
localClockNow := pdNow.Add(30 * time.Second)
currentTs := oracle.GoTimeToTS(pdNow)

resolvedTime := pdNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())

tsIfUncapped := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Greater(t, tsIfUncapped, currentTs)
require.Equal(t, currentTs, progress.getResolveLockTargetTs(localClockNow, currentTs))

resolvedTime = pdNow.Add(-20 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
tsIfUncapped = oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Less(t, tsIfUncapped, currentTs)
require.Equal(t, tsIfUncapped, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(false)
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(true)
progress.resolvedTsUpdated.Store(time.Now().Unix())
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))
Comment on lines +129 to +131

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since getResolveLockTargetTs is updated to use currentTime.Sub(resolvedTsUpdated) instead of the system clock-dependent time.Since(resolvedTsUpdated), we should update the test to set resolvedTsUpdated relative to localClockNow (the mocked current time) instead of time.Now().Unix(). This ensures the test remains deterministic and correctly tests the fence condition.

Suggested change
progress.initialized.Store(true)
progress.resolvedTsUpdated.Store(time.Now().Unix())
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))
progress.initialized.Store(true)
progress.resolvedTsUpdated.Store(localClockNow.Unix())
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))


progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())
recentResolvedTime := localClockNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(recentResolvedTime))
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))
}
10 changes: 10 additions & 0 deletions tests/integration_tests/debezium/sql/debezium/mysql-dbz-193.ddl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE `roles` (
`id` varchar(32) NOT NULL,
`name` varchar(100) NOT NULL,
`context` varchar(20) NOT NULL,
`organization_id` int(11) DEFAULT NULL,
`client_id` varchar(32) NOT NULL,
`scope_action_ids` text NOT NULL,
PRIMARY KEY (`id`)
-- FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) is disabled because TiDB classic rejects starter-only FULLTEXT indexes.
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Loading