puller(ticdc): cap resolve lock target ts by PD tso (#12741)#12742
puller(ticdc): cap resolve lock target ts by PD tso (#12741)#12742ti-chi-bot wants to merge 1 commit into
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
This cherry pick PR is for a release branch and has not yet been approved by triage owners. To merge this cherry pick:
DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
@tenfyzhong This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request aims to cap the resolve lock target timestamp by the PD TSO in the multiplexing puller. However, the changes introduce unresolved git conflict markers, duplicate/broken definitions, and compilation errors (such as referencing the non-existent p.pdClock and p.subscriptionIDs). Additionally, the feedback highlights a correctness and testability issue in getResolveLockTargetTs where the real system clock is used instead of the passed-in currentTime parameter, which should also be addressed in the corresponding unit tests to ensure determinism.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| <<<<<<< HEAD | ||
| type tableProgressWithSubID struct { | ||
| ======= | ||
| func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.ResolvedSpans) (err error) { | ||
| for _, resolvedSpan := range e.Spans { | ||
| if !spanz.IsSubSpan(resolvedSpan.Span, p.spans...) { | ||
| log.Panic("the resolved span is not in the table spans", | ||
| zap.String("namespace", p.changefeed.Namespace), | ||
| zap.String("changefeed", p.changefeed.ID), | ||
| zap.String("tableName", p.tableName), | ||
| zap.Any("spans", p.spans)) | ||
| } | ||
| p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.ResolvedTs) | ||
| if e.ResolvedTs > p.maxIngressResolvedTs.Load() { | ||
| p.maxIngressResolvedTs.Store(e.ResolvedTs) | ||
| } | ||
| } | ||
| resolvedTs := p.tsTracker.Frontier() | ||
|
|
||
| if resolvedTs > 0 && p.initialized.CompareAndSwap(false, true) { | ||
| log.Info("puller is initialized", | ||
| zap.String("namespace", p.changefeed.Namespace), | ||
| zap.String("changefeed", p.changefeed.ID), | ||
| zap.String("tableName", p.tableName), | ||
| zap.Any("tableID", p.spans), | ||
| zap.Uint64("resolvedTs", resolvedTs), | ||
| zap.Duration("duration", time.Since(p.start)), | ||
| ) | ||
| } | ||
| if resolvedTs > p.resolvedTs.Load() { | ||
| p.resolvedTs.Store(resolvedTs) | ||
| p.resolvedTsUpdated.Store(time.Now().Unix()) | ||
| raw := &model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved} | ||
| err = p.consume.f(ctx, raw, p.spans) | ||
| } | ||
|
|
||
| return | ||
| } | ||
|
|
||
| func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 { | ||
| resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0) | ||
| if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence { | ||
| return 0 | ||
| } | ||
| resolvedTs := p.resolvedTs.Load() | ||
| resolvedTime := oracle.GetTimeFromTS(resolvedTs) | ||
| if currentTime.Sub(resolvedTime) < resolveLockFence { | ||
| return 0 | ||
| } | ||
|
|
||
| return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))) | ||
| } | ||
|
|
||
| func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) { | ||
| targetTs := p.getResolveLockTargetTs(currentTime, currentTs) | ||
| if targetTs == 0 { | ||
| return | ||
| } | ||
|
|
||
| for _, subID := range p.subscriptionIDs { | ||
| p.client.ResolveLock(subID, targetTs) | ||
| } | ||
| } | ||
|
|
||
| type subscription struct { | ||
| >>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741)) |
There was a problem hiding this comment.
There are unresolved git conflict markers in this file, and the cherry-pick has introduced duplicate/broken definitions of handleResolvedSpans and resolveLock.
Additionally, there is a correctness/testability bug in getResolveLockTargetTs: it uses time.Since(resolvedTsUpdated) which depends on the real system clock, instead of using the passed-in currentTime parameter (i.e., currentTime.Sub(resolvedTsUpdated)). This makes the function non-deterministic and can cause test flakiness or incorrect behavior under clock skew.
Please resolve the conflict markers and apply the following fixes:
- Keep
tableProgressWithSubIDand definegetResolveLockTargetTsusingcurrentTime.Sub(resolvedTsUpdated). - Define the updated
resolveLocktakingcurrentTsand usingp.subIDs(instead of the undefinedp.subscriptionIDs). - Note: Since Go does not support method overloading, you must also delete the old, duplicate definitions of
handleResolvedSpans(at line 486) andresolveLock(at line 519) that were left outside of the conflict markers during the cherry-pick.
type tableProgressWithSubID struct {
*tableProgress
subID kv.SubscriptionID
}
func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 {
resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0)
if !p.initialized.Load() || currentTime.Sub(resolvedTsUpdated) < resolveLockFence {
return 0
}
resolvedTs := p.resolvedTs.Load()
resolvedTime := oracle.GetTimeFromTS(resolvedTs)
if currentTime.Sub(resolvedTime) < resolveLockFence {
return 0
}
return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)))
}
func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) {
targetTs := p.getResolveLockTargetTs(currentTime, currentTs)
if targetTs == 0 {
return
}
for _, subID := range p.subIDs {
p.client.ResolveLock(subID, targetTs)
}
}| <<<<<<< HEAD | ||
| currentTime := p.client.GetPDClock().CurrentTime() | ||
| ======= | ||
| physical, logical, err := p.client.GetTS(ctx) | ||
| if err != nil { | ||
| log.Warn("get ts from pd failed", zap.Error(err)) | ||
| continue | ||
| } | ||
| currentTs := oracle.ComposeTS(physical, logical) | ||
| currentTime := p.pdClock.CurrentTime() | ||
| >>>>>>> 42ad8a6dc4 (puller(ticdc): cap resolve lock target ts by PD tso (#12741)) |
There was a problem hiding this comment.
There are unresolved git conflict markers here. Additionally, p.pdClock is used, but MultiplexingPuller does not have a pdClock field. It should use p.client.GetPDClock() instead.
physical, logical, err := p.client.GetTS(ctx)
if err != nil {
log.Warn("get ts from pd failed", zap.Error(err))
continue
}
currentTs := oracle.ComposeTS(physical, logical)
currentTime := p.client.GetPDClock().CurrentTime()| progress.initialized.Store(true) | ||
| progress.resolvedTsUpdated.Store(time.Now().Unix()) | ||
| require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) |
There was a problem hiding this comment.
Since getResolveLockTargetTs is updated to use currentTime.Sub(resolvedTsUpdated) instead of the system clock-dependent time.Since(resolvedTsUpdated), we should update the test to set resolvedTsUpdated relative to localClockNow (the mocked current time) instead of time.Now().Unix(). This ensures the test remains deterministic and correctly tests the fence condition.
| progress.initialized.Store(true) | |
| progress.resolvedTsUpdated.Store(time.Now().Unix()) | |
| require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) | |
| progress.initialized.Store(true) | |
| progress.resolvedTsUpdated.Store(localClockNow.Unix()) | |
| require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs)) |
|
@ti-chi-bot: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
This is an automated cherry-pick of #12741
What problem does this PR solve?
Issue Number: close #12740
TiCDC stale-lock resolving can derive the
ScanLocktarget timestamp from local clock based resolved-time calculations. If the local CDC clock is ahead of PD time, the target timestamp can exceed the latest PD TSO, which may advance TiKV localMaxTSand make async-commit residual locks fail to resolve withcommit_ts_expired.This ports the same fix as pingcap/ticdc#5419 to TiFlow.
What is changed and how it works?
This PR changes the TiCDC multiplexing puller resolve-lock checker to fetch the current TSO from PD before scanning stale locks and cap the stale-lock target timestamp by that PD
currentTs.The freshness fence for
resolvedTsUpdatedstill uses the local clock becauseresolvedTsUpdatedis written withtime.Now(). The PD-derived timestamp is used only as the upper bound for theScanLocktarget timestamp.It also adds unit coverage for clock skew, target timestamp capping, and the existing fence conditions.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No compatibility impact is expected. The resolve-lock checker now performs one PD
GetTScall per resolve-lock tick before scheduling stale-lock scans.Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note