Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions sync_diff_inspector/diff/global_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package diff

import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -128,6 +129,11 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error {
return errors.Trace(err)
}

// upChunks/downChunks may be estimates (limit iterator) or exact (random
// iterator). Seed the bar with the estimate and let the producers grow it
// as real chunks appear; we correct it to the exact total at the end.
est := int64(upChunks + downChunks)
var produced int64
progress.StartTable(progressID, upChunks+downChunks, false)
eg, egCtx := errgroup.WithContext(ctx)
flushDone := make(chan struct{})
Expand All @@ -153,6 +159,8 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error {
upIter,
checkpointState.Upstream,
progressID,
est,
&produced,
)
return err
})
Expand All @@ -164,6 +172,8 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error {
downIter,
checkpointState.Downstream,
progressID,
est,
&produced,
)
return err
})
Expand Down Expand Up @@ -196,7 +206,14 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error {
zap.Uint64("downstream checksum", downChecksum),
)
}
progress.UpdateTotal(progressID, 0, true)
// Correct the (possibly estimated) total to the exact produced count and
// freeze it; the trailing Inc then trips the table-complete condition.
producedExact := atomic.LoadInt64(&produced)
curTotal := est
if producedExact > curTotal {
curTotal = producedExact
}
progress.UpdateTotal(progressID, int(producedExact-curTotal), true)
progress.Inc(progressID)
df.report.ClearTableMeetError(schema, table)
df.report.SetTableDataCheckResult(schema, table, equal, 0, 0, upCount, downCount, chunkID)
Expand All @@ -211,11 +228,19 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error {
return nil
}

// produceChecksumTasks pulls chunks from iter and feeds them to taskCh. It also
// keeps the progress-bar denominator ahead of the real chunk count: total is
// seeded with an estimate (est), and once the number of produced chunks exceeds
// that estimate we grow the total so the bar never overflows. produced is shared
// across the upstream and downstream producers, hence the atomic access.
func produceChecksumTasks(
ctx context.Context,
iter splitter.ChunkIterator,
tableIndex int,
taskCh chan<- checksumTask,
progressID string,
est int64,
produced *int64,
) error {
seq := 0
for {
Expand All @@ -229,6 +254,11 @@ func produceChecksumTasks(
return ctx.Err()
case taskCh <- checksumTask{seq: seq, rangeInfo: &splitter.RangeInfo{ChunkRange: chunkRange}}:
seq++
if progressID != "" {
if n := atomic.AddInt64(produced, 1); n > est {
progress.UpdateTotal(progressID, 1, false)
}
}
}
}
}
Expand Down Expand Up @@ -331,6 +361,8 @@ func (df *Diff) getSourceGlobalChecksum(
iter splitter.ChunkIterator,
state *checkpoints.ChecksumSourceState,
progressID string,
est int64,
produced *int64,
) (int64, uint64, error) {
if state.Done {
return state.Count, state.Checksum, nil
Expand All @@ -347,7 +379,7 @@ func (df *Diff) getSourceGlobalChecksum(
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
defer close(taskCh)
return produceChecksumTasks(egCtx, iter, tableIndex, taskCh)
return produceChecksumTasks(egCtx, iter, tableIndex, taskCh, progressID, est, produced)
})

for range concurrency {
Expand Down
4 changes: 2 additions & 2 deletions sync_diff_inspector/diff/global_checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func TestGetSourceGlobalChecksumKeepsCheckpointOrder(t *testing.T) {
iter, _, err := src.GetGlobalChecksumIterator(context.Background(), 0, nil)
require.NoError(t, err)

count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "")
count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "", 0, new(int64))
require.NoError(t, err)
require.Equal(t, int64(16), count)
require.Equal(t, uint64(10), checksum)
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestGetSourceGlobalChecksumResumeFromLastRange(t *testing.T) {
iter, _, err := src.GetGlobalChecksumIterator(context.Background(), 0, &splitter.RangeInfo{ChunkRange: lastRange.Clone()})
require.NoError(t, err)

count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "")
count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "", 0, new(int64))
require.NoError(t, err)
require.Equal(t, int64(9), count)
require.Equal(t, uint64(12), checksum)
Expand Down
19 changes: 18 additions & 1 deletion sync_diff_inspector/progress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,26 @@ func (tpp *tableProgressPrinter) flush(stateIsChanged bool) {
// show bar
// 60 '='+'-'
coe := float32(tpp.progressTableNums*tpp.progress)/float32(tpp.tableNums*(tpp.total+1)) + float32(tpp.finishTableNums)/float32(tpp.tableNums)
// Clamp the coefficient: an underestimated total can drive coe above 1, which
// would make strings.Repeat receive a negative count (panic) and render >100%.
if coe > 1 {
coe = 1
}
if coe < 0 {
coe = 0
}
numLeft := int(60 * coe)
percent := int(100 * coe)
tpp.output.Write("Progress [%s>%s] %d%% %d/%d\n", strings.Repeat("=", numLeft), strings.Repeat("-", 60-numLeft), percent, tpp.progress, tpp.total)
// Mark the figures as estimated while any active table still has a growing
// (non-finalized) total, so users know the denominator may still change.
mark := ""
for _, e := range tpp.tableMap {
if !e.Value.(*TableProgress).totalStopUpdate {
mark = "~"
break
}
}
Comment on lines +454 to +459

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Currently, this checks totalStopUpdate for all tables in tpp.tableMap. However, tables that are registered but not yet started also reside in tpp.tableMap and have totalStopUpdate set to false by default. This causes the progress bar to display the estimation marker ~ even when using exact splitters (like bucket or random) until the very last table starts. We should only check totalStopUpdate for tables that are actually active (i.e., in tableStatePrestart or tableStateComparing state).

Suggested change
for _, e := range tpp.tableMap {
if !e.Value.(*TableProgress).totalStopUpdate {
mark = "~"
break
}
}
for _, e := range tpp.tableMap {
tp := e.Value.(*TableProgress)
if (tp.state&(tableStatePrestart|tableStateComparing)) != 0 && !tp.totalStopUpdate {
mark = "~"
break
}
}

tpp.output.Write("Progress [%s>%s] %s%d%% %d/%s%d\n", strings.Repeat("=", numLeft), strings.Repeat("-", 60-numLeft), mark, percent, tpp.progress, mark, tpp.total)
}

var progress *tableProgressPrinter = nil
Expand Down
30 changes: 30 additions & 0 deletions sync_diff_inspector/progress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,33 @@ func TestAllSuccess(t *testing.T) {
"You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n",
)
}

// TestProgressEstimateClampAndMarker verifies that an underestimated, not-yet
// finalized total (as produced by the limit iterator) never overflows the bar
// and is rendered with the "~" estimation marker. Before clamping, driving the
// progress past the total made strings.Repeat receive a negative count and
// panic the serve goroutine; the test completing at all proves the clamp.
func TestProgressEstimateClampAndMarker(t *testing.T) {
p := newTableProgressPrinter(1, 0)
p.RegisterTable("t", false, false, common.AllTableExistFlag)
// Seed with a deliberate under-estimate (2) that is not finalized.
p.StartTable("t", 2, false)

buffer := new(bytes.Buffer)
p.SetOutput(buffer)

// Drive progress well past the estimate without finalizing the total.
for i := 0; i < 10; i++ {
p.Inc("t")
}
time.Sleep(300 * time.Millisecond)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The time.Sleep here is redundant and can be removed. p.Close() internally sends a close operator to the queue and synchronously blocks on <-tpp.finishCh until all pending progress updates (including the 10 Inc calls) are fully processed and flushed to the output buffer. Removing this sleep avoids unnecessary test delays and potential flakiness.

p.Close()

out := buffer.String()
// The figures are marked as estimated while the total keeps growing.
require.Contains(t, out, "~")
// The percentage is clamped to 100% and never overflows it.
require.Contains(t, out, "100%")
require.NotContains(t, out, "101%")
require.NotContains(t, out, "200%")
}
91 changes: 65 additions & 26 deletions sync_diff_inspector/splitter/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ type LimitIterator struct {

progressID string
columnOffset map[string]int
chunkCount int
// estChunkCount is an estimated chunk count derived from table statistics
// (never a COUNT(*) scan). It seeds the progress bar; 0 means "unknown".
estChunkCount int
}

// NewLimitIterator return a new iterator
Expand Down Expand Up @@ -125,32 +127,33 @@ func NewLimitIteratorWithCheckpoint(

tagChunk.IndexColumnNames = utils.GetColumnNames(indexColumns)

remainingRows := int64(0)
if undone {
where, args := "TRUE", []any(nil)
if startRange != nil {
where, args = tagChunk.ToString(table.Collation)
}
remainingRows, err = getRowCount(ctx, dbConn, table.Schema, table.Table, where, args)
if err != nil {
return nil, errors.Trace(err)
// estRows is an estimated row count read from table statistics, never a
// COUNT(*) full-table scan. It is only used to seed the progress bar with an
// initial chunk total; the real total is corrected as chunks are produced.
// For checkpoint resume (startRange != nil) we cannot cheaply estimate the
// rows remaining behind the WHERE clause, so we skip the estimate and let the
// progress bar grow purely dynamically.
estRows := int64(0)
if undone && startRange == nil {
if v, ok := getEstimatedRowCount(ctx, dbConn, table.Schema, table.Table); ok {
estRows = v
}
}

chunkSize := table.ChunkSize
if chunkSize <= 0 {
if len(table.Info.Indices) != 0 {
chunkSize = utils.CalculateChunkSize(remainingRows)
} else {
// no index
// will use table scan
// so we use one chunk
chunkSize = remainingRows
}
// estRows == 0 falls back to the default chunk size (50000).
chunkSize = utils.CalculateChunkSize(estRows)
}
log.Info("get chunk size for table", zap.Int64("chunk size", chunkSize),
zap.Int64("estimated rows", estRows),
zap.String("db", table.Schema), zap.String("table", table.Table))

estChunkCount := 0
if estRows > 0 {
estChunkCount = int((estRows + chunkSize - 1) / chunkSize)
}

lctx, cancel := context.WithCancel(ctx)
queryTmpl := generateLimitQueryTemplate(indexColumns, table, chunkSize)

Expand All @@ -169,11 +172,11 @@ func NewLimitIteratorWithCheckpoint(

progressID,
columnOffset,
int((remainingRows + chunkSize - 1) / chunkSize),
estChunkCount,
}

if progressID != "" {
progress.StartTable(progressID, 0, false)
progress.StartTable(progressID, estChunkCount, false)
}
if !undone {
// this table is finished.
Expand Down Expand Up @@ -222,15 +225,30 @@ func (lmt *LimitIterator) GetIndexID() int64 {
return lmt.indexID
}

// Len returns estimated remaining chunks for this iterator.
// Len returns an estimated chunk count for this iterator (0 when unknown).
// The value comes from table statistics, not a COUNT(*) scan, so it is only an
// estimate; callers use it to seed a progress bar, not for correctness.
func (lmt *LimitIterator) Len() int {
return lmt.chunkCount
return lmt.estChunkCount
}

func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {
// produced counts chunks actually emitted; curTotal tracks the progress-bar
// denominator. We keep curTotal >= produced so the bar never overflows, and
// on completion we correct it to the exact produced count and freeze it.
produced := 0
curTotal := lmt.estChunkCount
bumpTotal := func() {
produced++
if lmt.progressID != "" && produced > curTotal {
progress.UpdateTotal(lmt.progressID, produced-curTotal, false)
curTotal = produced
}
}
defer func() {
if lmt.progressID != "" {
progress.UpdateTotal(lmt.progressID, 0, true)
// Correct the estimate to the exact count and stop further updates.
progress.UpdateTotal(lmt.progressID, produced-curTotal, true)
}
close(lmt.chunksCh)
}()
Expand Down Expand Up @@ -267,9 +285,7 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) {

chunk.InitChunk(chunkRange, chunk.Limit, bucketID, bucketID, lmt.table.Collation, lmt.table.Range)
bucketID++
if lmt.progressID != "" {
progress.UpdateTotal(lmt.progressID, 1, false)
}
bumpTotal()
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -303,6 +319,29 @@ func (lmt *LimitIterator) getLimitRow(ctx context.Context, query string, args []
return dataMap, nil
}

// getEstimatedRowCount returns an estimated row count from table statistics
// (information_schema.tables.TABLE_ROWS) without scanning the table. The boolean
// is false when no usable estimate is available (query error, missing row, or a
// NULL/non-positive value), in which case callers fall back to a dynamic total.
func getEstimatedRowCount(ctx context.Context, db *sql.DB, schemaName, tableName string) (int64, bool) {
failpoint.Inject("getEstimatedRowCount", func(val failpoint.Value) {
if v, ok := val.(int); ok {
failpoint.Return(int64(v), v > 0)
}
})
query := "SELECT TABLE_ROWS FROM information_schema.tables WHERE table_schema = ? AND table_name = ?"
var estRows sql.NullInt64
if err := db.QueryRowContext(ctx, query, schemaName, tableName).Scan(&estRows); err != nil {
log.Warn("failed to get estimated row count, progress bar will grow dynamically",
zap.String("db", schemaName), zap.String("table", tableName), zap.Error(err))
return 0, false
}
if !estRows.Valid || estRows.Int64 <= 0 {
return 0, false
}
return estRows.Int64, true
}

func generateLimitQueryTemplate(indexColumns []*model.ColumnInfo, table *common.TableDiff, chunkSize int64) string {
fields := make([]string, 0, len(indexColumns))
for _, columnInfo := range indexColumns {
Expand Down
Loading