Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/kv/shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ func (s *SharedClient) AllocSubscriptionID() SubscriptionID {
return SubscriptionID(subscriptionIDGen.Add(1))
}

// GetTS returns the current PD TSO.
func (s *SharedClient) GetTS(ctx context.Context) (int64, int64, error) {
if s.pd == nil {
return 0, 0, errors.New("pd client is nil")
}
return s.pd.GetTS(ctx)
}

// Subscribe the given table span.
// NOTE: `span.TableID` must be set correctly.
// It new a subscribedTable and store it in `s.totalSpans`,
Expand Down
22 changes: 18 additions & 4 deletions cdc/puller/multiplexing_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,26 @@ func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.Resolv
return
}

func (p *tableProgress) resolveLock(currentTime time.Time) {
func (p *tableProgress) getResolveLockTargetTs(currentTime time.Time, currentTs uint64) uint64 {
resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0)
if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence {
return
return 0
}
resolvedTs := p.resolvedTs.Load()
resolvedTime := oracle.GetTimeFromTS(resolvedTs)
if currentTime.Sub(resolvedTime) < resolveLockFence {
return 0
}

return min(currentTs, oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)))
}

func (p *tableProgress) resolveLock(currentTime time.Time, currentTs uint64) {
targetTs := p.getResolveLockTargetTs(currentTime, currentTs)
if targetTs == 0 {
return
}

targetTs := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
for _, subID := range p.subscriptionIDs {
p.client.ResolveLock(subID, targetTs)
}
Expand Down Expand Up @@ -488,13 +496,19 @@ func (p *MultiplexingPuller) runResolveLockChecker(ctx context.Context) error {
return ctx.Err()
case <-resolveLockTicker.C:
}
physical, logical, err := p.client.GetTS(ctx)
if err != nil {
log.Warn("get ts from pd failed", zap.Error(err))
continue
}
Comment on lines +499 to +503

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When the context is canceled (e.g., during changefeed shutdown or pause), p.client.GetTS(ctx) will fail with a context cancellation error. Logging this as a warning can be misleading and spammy. We should check ctx.Err() and return early if the context is canceled.

Additionally, to improve observability and make troubleshooting easier in production environments with multiple changefeeds, we should include the namespace and changefeed fields in the warning log.

		physical, logical, err := p.client.GetTS(ctx)
		if err != nil {
			if ctx.Err() != nil {
				return ctx.Err()
			}
			log.Warn("get ts from pd failed",
				zap.String("namespace", p.changefeed.Namespace),
				zap.String("changefeed", p.changefeed.ID),
				zap.Error(err))
			continue
		}

currentTs := oracle.ComposeTS(physical, logical)
currentTime := p.pdClock.CurrentTime()
for progress := range p.getAllProgresses() {
select {
case <-ctx.Done():
return ctx.Err()
default:
progress.resolveLock(currentTime)
progress.resolveLock(currentTime, currentTs)
}
}
}
Expand Down
36 changes: 36 additions & 0 deletions cdc/puller/multiplexing_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller {
Expand Down Expand Up @@ -106,3 +107,38 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) {
cancel()
wg.Wait()
}

func TestGetResolveLockTargetTs(t *testing.T) {
progress := &tableProgress{}
progress.initialized.Store(true)

pdNow := time.Now()
localClockNow := pdNow.Add(30 * time.Second)
currentTs := oracle.GoTimeToTS(pdNow)

resolvedTime := pdNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())

tsIfUncapped := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Greater(t, tsIfUncapped, currentTs)
require.Equal(t, currentTs, progress.getResolveLockTargetTs(localClockNow, currentTs))

resolvedTime = pdNow.Add(-20 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(resolvedTime))
tsIfUncapped = oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence))
require.Less(t, tsIfUncapped, currentTs)
require.Equal(t, tsIfUncapped, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(false)
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.initialized.Store(true)
progress.resolvedTsUpdated.Store(time.Now().Unix())
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))

progress.resolvedTsUpdated.Store(pdNow.Add(-10 * time.Second).Unix())
recentResolvedTime := localClockNow.Add(-2 * time.Second)
progress.resolvedTs.Store(oracle.GoTimeToTS(recentResolvedTime))
require.Zero(t, progress.getResolveLockTargetTs(localClockNow, currentTs))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ CREATE TABLE `roles` (
`organization_id` int(11) DEFAULT NULL,
`client_id` varchar(32) NOT NULL,
`scope_action_ids` text NOT NULL,
PRIMARY KEY (`id`),
FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`)
PRIMARY KEY (`id`)
-- FULLTEXT KEY `scope_action_ids_idx` (`scope_action_ids`) is disabled because TiDB classic rejects starter-only FULLTEXT indexes.
) ENGINE=InnoDB DEFAULT CHARSET=utf8;