diff --git a/sync_diff_inspector/diff/global_checksum.go b/sync_diff_inspector/diff/global_checksum.go index c663151769..78d4d20045 100644 --- a/sync_diff_inspector/diff/global_checksum.go +++ b/sync_diff_inspector/diff/global_checksum.go @@ -15,6 +15,7 @@ package diff import ( "context" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -128,6 +129,11 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error { return errors.Trace(err) } + // upChunks/downChunks may be estimates (limit iterator) or exact (random + // iterator). Seed the bar with the estimate and let the producers grow it + // as real chunks appear; we correct it to the exact total at the end. + est := int64(upChunks + downChunks) + var produced int64 progress.StartTable(progressID, upChunks+downChunks, false) eg, egCtx := errgroup.WithContext(ctx) flushDone := make(chan struct{}) @@ -153,6 +159,8 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error { upIter, checkpointState.Upstream, progressID, + est, + &produced, ) return err }) @@ -164,6 +172,8 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error { downIter, checkpointState.Downstream, progressID, + est, + &produced, ) return err }) @@ -196,7 +206,14 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error { zap.Uint64("downstream checksum", downChecksum), ) } - progress.UpdateTotal(progressID, 0, true) + // Correct the (possibly estimated) total to the exact produced count and + // freeze it; the trailing Inc then trips the table-complete condition. + producedExact := atomic.LoadInt64(&produced) + curTotal := est + if producedExact > curTotal { + curTotal = producedExact + } + progress.UpdateTotal(progressID, int(producedExact-curTotal), true) progress.Inc(progressID) df.report.ClearTableMeetError(schema, table) df.report.SetTableDataCheckResult(schema, table, equal, 0, 0, upCount, downCount, chunkID) @@ -211,11 +228,19 @@ func (df *Diff) equalByGlobalChecksum(ctx context.Context) error { return nil } +// produceChecksumTasks pulls chunks from iter and feeds them to taskCh. It also +// keeps the progress-bar denominator ahead of the real chunk count: total is +// seeded with an estimate (est), and once the number of produced chunks exceeds +// that estimate we grow the total so the bar never overflows. produced is shared +// across the upstream and downstream producers, hence the atomic access. func produceChecksumTasks( ctx context.Context, iter splitter.ChunkIterator, tableIndex int, taskCh chan<- checksumTask, + progressID string, + est int64, + produced *int64, ) error { seq := 0 for { @@ -229,6 +254,11 @@ func produceChecksumTasks( return ctx.Err() case taskCh <- checksumTask{seq: seq, rangeInfo: &splitter.RangeInfo{ChunkRange: chunkRange}}: seq++ + if progressID != "" { + if n := atomic.AddInt64(produced, 1); n > est { + progress.UpdateTotal(progressID, 1, false) + } + } } } } @@ -331,6 +361,8 @@ func (df *Diff) getSourceGlobalChecksum( iter splitter.ChunkIterator, state *checkpoints.ChecksumSourceState, progressID string, + est int64, + produced *int64, ) (int64, uint64, error) { if state.Done { return state.Count, state.Checksum, nil @@ -347,7 +379,7 @@ func (df *Diff) getSourceGlobalChecksum( eg, egCtx := errgroup.WithContext(ctx) eg.Go(func() error { defer close(taskCh) - return produceChecksumTasks(egCtx, iter, tableIndex, taskCh) + return produceChecksumTasks(egCtx, iter, tableIndex, taskCh, progressID, est, produced) }) for range concurrency { diff --git a/sync_diff_inspector/diff/global_checksum_test.go b/sync_diff_inspector/diff/global_checksum_test.go index 3eaacdd852..473a5894a9 100644 --- a/sync_diff_inspector/diff/global_checksum_test.go +++ b/sync_diff_inspector/diff/global_checksum_test.go @@ -198,7 +198,7 @@ func TestGetSourceGlobalChecksumKeepsCheckpointOrder(t *testing.T) { iter, _, err := src.GetGlobalChecksumIterator(context.Background(), 0, nil) require.NoError(t, err) - count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "") + count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "", 0, new(int64)) require.NoError(t, err) require.Equal(t, int64(16), count) require.Equal(t, uint64(10), checksum) @@ -226,7 +226,7 @@ func TestGetSourceGlobalChecksumResumeFromLastRange(t *testing.T) { iter, _, err := src.GetGlobalChecksumIterator(context.Background(), 0, &splitter.RangeInfo{ChunkRange: lastRange.Clone()}) require.NoError(t, err) - count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "") + count, checksum, err := df.getSourceGlobalChecksum(context.Background(), src, 0, iter, state, "", 0, new(int64)) require.NoError(t, err) require.Equal(t, int64(9), count) require.Equal(t, uint64(12), checksum) diff --git a/sync_diff_inspector/progress/progress.go b/sync_diff_inspector/progress/progress.go index bbd9c87b5c..27bdc30c3c 100644 --- a/sync_diff_inspector/progress/progress.go +++ b/sync_diff_inspector/progress/progress.go @@ -438,9 +438,26 @@ func (tpp *tableProgressPrinter) flush(stateIsChanged bool) { // show bar // 60 '='+'-' coe := float32(tpp.progressTableNums*tpp.progress)/float32(tpp.tableNums*(tpp.total+1)) + float32(tpp.finishTableNums)/float32(tpp.tableNums) + // Clamp the coefficient: an underestimated total can drive coe above 1, which + // would make strings.Repeat receive a negative count (panic) and render >100%. + if coe > 1 { + coe = 1 + } + if coe < 0 { + coe = 0 + } numLeft := int(60 * coe) percent := int(100 * coe) - tpp.output.Write("Progress [%s>%s] %d%% %d/%d\n", strings.Repeat("=", numLeft), strings.Repeat("-", 60-numLeft), percent, tpp.progress, tpp.total) + // Mark the figures as estimated while any active table still has a growing + // (non-finalized) total, so users know the denominator may still change. + mark := "" + for _, e := range tpp.tableMap { + if !e.Value.(*TableProgress).totalStopUpdate { + mark = "~" + break + } + } + tpp.output.Write("Progress [%s>%s] %s%d%% %d/%s%d\n", strings.Repeat("=", numLeft), strings.Repeat("-", 60-numLeft), mark, percent, tpp.progress, mark, tpp.total) } var progress *tableProgressPrinter = nil diff --git a/sync_diff_inspector/progress/progress_test.go b/sync_diff_inspector/progress/progress_test.go index 3a12bc1122..bef1dcb757 100644 --- a/sync_diff_inspector/progress/progress_test.go +++ b/sync_diff_inspector/progress/progress_test.go @@ -106,3 +106,33 @@ func TestAllSuccess(t *testing.T) { "You can view the comparison details through './output_dir/sync_diff_inspector.log'\n\n", ) } + +// TestProgressEstimateClampAndMarker verifies that an underestimated, not-yet +// finalized total (as produced by the limit iterator) never overflows the bar +// and is rendered with the "~" estimation marker. Before clamping, driving the +// progress past the total made strings.Repeat receive a negative count and +// panic the serve goroutine; the test completing at all proves the clamp. +func TestProgressEstimateClampAndMarker(t *testing.T) { + p := newTableProgressPrinter(1, 0) + p.RegisterTable("t", false, false, common.AllTableExistFlag) + // Seed with a deliberate under-estimate (2) that is not finalized. + p.StartTable("t", 2, false) + + buffer := new(bytes.Buffer) + p.SetOutput(buffer) + + // Drive progress well past the estimate without finalizing the total. + for i := 0; i < 10; i++ { + p.Inc("t") + } + time.Sleep(300 * time.Millisecond) + p.Close() + + out := buffer.String() + // The figures are marked as estimated while the total keeps growing. + require.Contains(t, out, "~") + // The percentage is clamped to 100% and never overflows it. + require.Contains(t, out, "100%") + require.NotContains(t, out, "101%") + require.NotContains(t, out, "200%") +} diff --git a/sync_diff_inspector/splitter/limit.go b/sync_diff_inspector/splitter/limit.go index 81bcce6f91..fd60f24aae 100644 --- a/sync_diff_inspector/splitter/limit.go +++ b/sync_diff_inspector/splitter/limit.go @@ -46,7 +46,9 @@ type LimitIterator struct { progressID string columnOffset map[string]int - chunkCount int + // estChunkCount is an estimated chunk count derived from table statistics + // (never a COUNT(*) scan). It seeds the progress bar; 0 means "unknown". + estChunkCount int } // NewLimitIterator return a new iterator @@ -125,32 +127,33 @@ func NewLimitIteratorWithCheckpoint( tagChunk.IndexColumnNames = utils.GetColumnNames(indexColumns) - remainingRows := int64(0) - if undone { - where, args := "TRUE", []any(nil) - if startRange != nil { - where, args = tagChunk.ToString(table.Collation) - } - remainingRows, err = getRowCount(ctx, dbConn, table.Schema, table.Table, where, args) - if err != nil { - return nil, errors.Trace(err) + // estRows is an estimated row count read from table statistics, never a + // COUNT(*) full-table scan. It is only used to seed the progress bar with an + // initial chunk total; the real total is corrected as chunks are produced. + // For checkpoint resume (startRange != nil) we cannot cheaply estimate the + // rows remaining behind the WHERE clause, so we skip the estimate and let the + // progress bar grow purely dynamically. + estRows := int64(0) + if undone && startRange == nil { + if v, ok := getEstimatedRowCount(ctx, dbConn, table.Schema, table.Table); ok { + estRows = v } } chunkSize := table.ChunkSize if chunkSize <= 0 { - if len(table.Info.Indices) != 0 { - chunkSize = utils.CalculateChunkSize(remainingRows) - } else { - // no index - // will use table scan - // so we use one chunk - chunkSize = remainingRows - } + // estRows == 0 falls back to the default chunk size (50000). + chunkSize = utils.CalculateChunkSize(estRows) } log.Info("get chunk size for table", zap.Int64("chunk size", chunkSize), + zap.Int64("estimated rows", estRows), zap.String("db", table.Schema), zap.String("table", table.Table)) + estChunkCount := 0 + if estRows > 0 { + estChunkCount = int((estRows + chunkSize - 1) / chunkSize) + } + lctx, cancel := context.WithCancel(ctx) queryTmpl := generateLimitQueryTemplate(indexColumns, table, chunkSize) @@ -169,11 +172,11 @@ func NewLimitIteratorWithCheckpoint( progressID, columnOffset, - int((remainingRows + chunkSize - 1) / chunkSize), + estChunkCount, } if progressID != "" { - progress.StartTable(progressID, 0, false) + progress.StartTable(progressID, estChunkCount, false) } if !undone { // this table is finished. @@ -222,15 +225,30 @@ func (lmt *LimitIterator) GetIndexID() int64 { return lmt.indexID } -// Len returns estimated remaining chunks for this iterator. +// Len returns an estimated chunk count for this iterator (0 when unknown). +// The value comes from table statistics, not a COUNT(*) scan, so it is only an +// estimate; callers use it to seed a progress bar, not for correctness. func (lmt *LimitIterator) Len() int { - return lmt.chunkCount + return lmt.estChunkCount } func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) { + // produced counts chunks actually emitted; curTotal tracks the progress-bar + // denominator. We keep curTotal >= produced so the bar never overflows, and + // on completion we correct it to the exact produced count and freeze it. + produced := 0 + curTotal := lmt.estChunkCount + bumpTotal := func() { + produced++ + if lmt.progressID != "" && produced > curTotal { + progress.UpdateTotal(lmt.progressID, produced-curTotal, false) + curTotal = produced + } + } defer func() { if lmt.progressID != "" { - progress.UpdateTotal(lmt.progressID, 0, true) + // Correct the estimate to the exact count and stop further updates. + progress.UpdateTotal(lmt.progressID, produced-curTotal, true) } close(lmt.chunksCh) }() @@ -267,9 +285,7 @@ func (lmt *LimitIterator) produceChunks(ctx context.Context, bucketID int) { chunk.InitChunk(chunkRange, chunk.Limit, bucketID, bucketID, lmt.table.Collation, lmt.table.Range) bucketID++ - if lmt.progressID != "" { - progress.UpdateTotal(lmt.progressID, 1, false) - } + bumpTotal() select { case <-ctx.Done(): return @@ -303,6 +319,29 @@ func (lmt *LimitIterator) getLimitRow(ctx context.Context, query string, args [] return dataMap, nil } +// getEstimatedRowCount returns an estimated row count from table statistics +// (information_schema.tables.TABLE_ROWS) without scanning the table. The boolean +// is false when no usable estimate is available (query error, missing row, or a +// NULL/non-positive value), in which case callers fall back to a dynamic total. +func getEstimatedRowCount(ctx context.Context, db *sql.DB, schemaName, tableName string) (int64, bool) { + failpoint.Inject("getEstimatedRowCount", func(val failpoint.Value) { + if v, ok := val.(int); ok { + failpoint.Return(int64(v), v > 0) + } + }) + query := "SELECT TABLE_ROWS FROM information_schema.tables WHERE table_schema = ? AND table_name = ?" + var estRows sql.NullInt64 + if err := db.QueryRowContext(ctx, query, schemaName, tableName).Scan(&estRows); err != nil { + log.Warn("failed to get estimated row count, progress bar will grow dynamically", + zap.String("db", schemaName), zap.String("table", tableName), zap.Error(err)) + return 0, false + } + if !estRows.Valid || estRows.Int64 <= 0 { + return 0, false + } + return estRows.Int64, true +} + func generateLimitQueryTemplate(indexColumns []*model.ColumnInfo, table *common.TableDiff, chunkSize int64) string { fields := make([]string, 0, len(indexColumns)) for _, columnInfo := range indexColumns { diff --git a/sync_diff_inspector/splitter/splitter_test.go b/sync_diff_inspector/splitter/splitter_test.go index ce0888fc32..672c51cf7a 100644 --- a/sync_diff_inspector/splitter/splitter_test.go +++ b/sync_diff_inspector/splitter/splitter_test.go @@ -755,11 +755,17 @@ func createFakeResultForBucketSplit(mock sqlmock.Sqlmock, aRandomValues, bRandom func createFakeResultForCount(t *testing.T, count int) { if count > 0 { - // generate fake result for get the row count of this table + // Fake the row count for the random iterator (exact COUNT(*)) and the + // estimated row count for the limit iterator (statistics, no scan). Each + // iterator only triggers its own failpoint, so enabling both is safe. testfailpoint.Enable(t, "github.com/pingcap/tiflow/sync_diff_inspector/splitter/getRowCount", fmt.Sprintf("return(%d)", count), ) + testfailpoint.Enable(t, + "github.com/pingcap/tiflow/sync_diff_inspector/splitter/getEstimatedRowCount", + fmt.Sprintf("return(%d)", count), + ) } } @@ -875,6 +881,76 @@ func TestLimitSpliter(t *testing.T) { } } +func TestLimitIteratorUsesEstimateNotCount(t *testing.T) { + ctx := context.Background() + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), primary key(`a`, `b`))" + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) + require.NoError(t, err) + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + ChunkSize: 1000, + } + + // The limit iterator must read an estimated row count from statistics, never + // a COUNT(*) full-table scan. Expect exactly the estimate query and return + // 5000 rows; a COUNT(*) would not match this expectation and fail the test. + mock.ExpectQuery("SELECT TABLE_ROWS FROM information_schema.tables"). + WithArgs("test", "test"). + WillReturnRows(sqlmock.NewRows([]string{"TABLE_ROWS"}).AddRow(5000)) + // First limit SELECT returns no rows, yielding a single tail chunk. + mock.ExpectQuery("SELECT `a`,.*").WillReturnRows(sqlmock.NewRows([]string{"a", "b"})) + + iter, err := NewLimitIterator(ctx, "", tableDiff, db) + require.NoError(t, err) + // ceil(5000 / 1000) = 5 estimated chunks. + require.Equal(t, 5, iter.Len()) + + c, err := iter.Next() + require.NoError(t, err) + require.NotNil(t, c) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestLimitIteratorEstimateUnavailableFallsBackToDynamic(t *testing.T) { + ctx := context.Background() + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + createTableSQL := "create table `test`.`test`(`a` int, `b` varchar(10), primary key(`a`, `b`))" + tableInfo, err := utils.GetTableInfoBySQL(createTableSQL, parser.New()) + require.NoError(t, err) + tableDiff := &common.TableDiff{ + Schema: "test", + Table: "test", + Info: tableInfo, + ChunkSize: 1000, + } + + // A NULL estimate (unanalyzed table) must not error; the iterator falls back + // to a dynamic total (Len() == 0) instead of scanning the table. + mock.ExpectQuery("SELECT TABLE_ROWS FROM information_schema.tables"). + WithArgs("test", "test"). + WillReturnRows(sqlmock.NewRows([]string{"TABLE_ROWS"}).AddRow(nil)) + mock.ExpectQuery("SELECT `a`,.*").WillReturnRows(sqlmock.NewRows([]string{"a", "b"})) + + iter, err := NewLimitIterator(ctx, "", tableDiff, db) + require.NoError(t, err) + require.Equal(t, 0, iter.Len()) + + // Drain the (async) producer so its limit SELECT runs before we assert. + c, err := iter.Next() + require.NoError(t, err) + require.NotNil(t, c) + require.NoError(t, mock.ExpectationsWereMet()) +} + func createFakeResultForLimitSplit(t *testing.T, mock sqlmock.Sqlmock, aValues []string, bValues []string, needEnd bool) { createFakeResultForCount(t, len(aValues))