diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index fae80b40c8cb3..846bee3ac7fc9 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -107,12 +107,13 @@ func testCheckpointMetaForRestore( require.Equal(t, checkpointMetaForSnapshotRestore.RestoredTS, checkpointMetaForSnapshotRestore2.RestoredTS) checkpointMetaForLogRestore := &checkpoint.CheckpointMetadataForLogRestore{ - UpstreamClusterID: 123, - RestoredTS: 222, - StartTS: 111, - RewriteTS: 333, - GcRatio: "1.0", - TiFlashItems: map[int64]model.TiFlashReplicaInfo{1: {Count: 1}}, + UpstreamClusterID: 123, + RestoredTS: 222, + StartTS: 111, + RewriteTS: 333, + GcRatio: "1.0", + RocksDBMaxBackgroundJobs: "8", + TiFlashItems: map[int64]model.TiFlashReplicaInfo{1: {Count: 1}}, } err = logMetaManager.SaveCheckpointMetadata(ctx, checkpointMetaForLogRestore) @@ -124,6 +125,7 @@ func testCheckpointMetaForRestore( require.Equal(t, checkpointMetaForLogRestore.StartTS, checkpointMetaForLogRestore2.StartTS) require.Equal(t, checkpointMetaForLogRestore.RewriteTS, checkpointMetaForLogRestore2.RewriteTS) require.Equal(t, checkpointMetaForLogRestore.GcRatio, checkpointMetaForLogRestore2.GcRatio) + require.Equal(t, checkpointMetaForLogRestore.RocksDBMaxBackgroundJobs, checkpointMetaForLogRestore2.RocksDBMaxBackgroundJobs) require.Equal(t, checkpointMetaForLogRestore.TiFlashItems, checkpointMetaForLogRestore2.TiFlashItems) exists, err := logMetaManager.ExistsCheckpointProgress(ctx) @@ -144,6 +146,7 @@ func testCheckpointMetaForRestore( require.Equal(t, uint64(111), taskInfo.Metadata.StartTS) require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS) require.Equal(t, "1.0", taskInfo.Metadata.GcRatio) + require.Equal(t, "8", taskInfo.Metadata.RocksDBMaxBackgroundJobs) require.Equal(t, true, taskInfo.HasSnapshotMetadata) require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress) diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index 9f11f9f134f62..7577a7328342d 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -134,12 +134,14 @@ func AppendRangeForLogRestore( } type CheckpointMetadataForLogRestore struct { - UpstreamClusterID uint64 `json:"upstream-cluster-id"` - RestoreStartTS uint64 `json:"restore-start-ts"` - RestoredTS uint64 `json:"restored-ts"` - StartTS uint64 `json:"start-ts"` - RewriteTS uint64 `json:"rewrite-ts"` - GcRatio string `json:"gc-ratio"` + UpstreamClusterID uint64 `json:"upstream-cluster-id"` + RestoreStartTS uint64 `json:"restore-start-ts"` + RestoredTS uint64 `json:"restored-ts"` + StartTS uint64 `json:"start-ts"` + RewriteTS uint64 `json:"rewrite-ts"` + GcRatio string `json:"gc-ratio"` + RocksDBMaxBackgroundJobs string `json:"rocksdb-max-background-jobs,omitempty"` + SnapshotRestoreDataSize uint64 `json:"snapshot-restore-data-size,omitempty"` // tiflash recorder items with snapshot restore records TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"` } diff --git a/br/pkg/conn/conn.go b/br/pkg/conn/conn.go index e125ff8666505..a796b7add6eb6 100644 --- a/br/pkg/conn/conn.go +++ b/br/pkg/conn/conn.go @@ -50,10 +50,10 @@ const ( // DefaultMergeRegionKeyCount is the default region key count, 960000. DefaultMergeRegionKeyCount uint64 = 960000 - // DefaultImportNumGoroutines is the default number of threads for import. - // use 128 as default value, which is 8 times of the default value of tidb. - // we think is proper for IO-bound cases. - DefaultImportNumGoroutines uint = 128 + // DefaultImportNumGoroutines is the default number of goroutines for restore. + DefaultImportNumGoroutines uint = 36 + + minRestoreConcurrencyOverImportThreads uint = 4 ) type VersionCheckerType int @@ -328,7 +328,8 @@ func (mgr *Mgr) GetCurrentTsFromPD(ctx context.Context) (uint64, error) { } // ProcessTiKVConfigs handle the tikv config for region split size, region split keys, and import goroutines in place. -// It retrieves the config from all alive tikv stores and returns the minimum values. +// It retrieves the config from all alive tikv stores, keeps conservative split values, +// and makes restore concurrency no less than import.num-threads plus a small margin. // If retrieving the config fails, it returns the default config values. func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, client *http.Client) { mergeRegionSize := cfg.MergeRegionSize @@ -362,9 +363,8 @@ func (mgr *Mgr) ProcessTiKVConfigs(ctx context.Context, cfg *kvconfig.KVConfig, log.Warn("Failed to parse import num-threads from config", logutil.ShortError(e)) return e } - // We use 8 times the default value because it's an IO-bound case. - if importGoroutines.Value == DefaultImportNumGoroutines || (threads > 0 && threads*8 < importGoroutines.Value) { - importGoroutines.Value = threads * 8 + if threads > 0 { + importGoroutines.Value = max(importGoroutines.Value, threads+minRestoreConcurrencyOverImportThreads) } } // replace the value diff --git a/br/pkg/conn/conn_test.go b/br/pkg/conn/conn_test.go index d0a1aae415430..61feaa97ba043 100644 --- a/br/pkg/conn/conn_test.go +++ b/br/pkg/conn/conn_test.go @@ -357,8 +357,8 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { content: []string{ "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 1, \"region-split-size\": \"1MiB\"}, \"import\": {\"num-threads\": 6}}", }, - // the number of import goroutines is 8 times than import.num-threads. - importNumGoroutines: 48, + // the default value already satisfies import.num-threads + 4. + importNumGoroutines: conn.DefaultImportNumGoroutines, // one tikv detected in this case we are not update default size and keys because they are too small. regionSplitSize: 1 * units.MiB, regionSplitKeys: 1, @@ -379,7 +379,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { content: []string{ "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", }, - importNumGoroutines: 1024, + importNumGoroutines: 132, // one tikv detected in this case and we update with new size and keys. regionSplitSize: 1 * units.GiB, regionSplitKeys: 10000000, @@ -411,8 +411,7 @@ func TestGetMergeRegionSizeAndCount(t *testing.T) { "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 10000000, \"region-split-size\": \"1GiB\"}, \"import\": {\"num-threads\": 128}}", "{\"log-level\": \"debug\", \"coprocessor\": {\"region-split-keys\": 12000000, \"region-split-size\": \"900MiB\"}, \"import\": {\"num-threads\": 12}}", }, - // two tikv detected in this case and we choose the small one. - importNumGoroutines: 96, + importNumGoroutines: 132, regionSplitSize: 1 * units.GiB, regionSplitKeys: 10000000, }, diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 92af9403eb6f0..8664b8358ccbe 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "batch_meta_processor.go", "client.go", "compacted_file_strategy.go", + "flow_control.go", "id_map.go", "import.go", "import_retry.go", @@ -59,6 +60,7 @@ go_library( "//pkg/util/codec", "//pkg/util/redact", "//pkg/util/sqlescape", + "//pkg/util/sqlexec", "@com_github_docker_go_units//:go-units", "@com_github_fatih_color//:color", "@com_github_gogo_protobuf//proto", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index ae294fd38d513..dce309e0c1ec5 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/failpoint" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/checksum" @@ -86,6 +87,7 @@ import ( const MetaKVBatchSize = 64 * 1024 * 1024 const maxSplitKeysOnce = 10240 const maxReadMetaKVFilesConcurrency uint = 128 +const defaultTiKVMaxReplicas uint = 3 // rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV. const rawKVBatchCount = 64 @@ -143,6 +145,8 @@ func (l *LogRestoreManager) Close(ctx context.Context) { // including concurrency management, checkpoint handling, and file importing(splitting) for efficient log processing. type SstRestoreManager struct { restorer restore.SstRestorer + storeCount uint + replicaCount uint checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] } @@ -366,6 +370,8 @@ func (rc *LogClient) RestoreSSTFileSets( ctx context.Context, backupFileSets restore.BatchBackupFileSet, importModeSwitcher *restore.ImportModeSwitcher, + snapshotRestoreDataSize uint64, + checkpointCompactedSSTSize uint64, onProgress func(int64), ) error { begin := time.Now() @@ -373,6 +379,15 @@ func (rc *LogClient) RestoreSSTFileSets( log.Info("[Compacted SST Restore] No SST files found for restoration.") return nil } + if err := rc.adjustTiKVFlowControlForCompactedSSTRestore( + ctx, + backupFileSets, + snapshotRestoreDataSize, + checkpointCompactedSSTSize, + ); err != nil { + return errors.Trace(err) + } + err := importModeSwitcher.GoSwitchToImportMode(ctx) if err != nil { return errors.Trace(err) @@ -565,6 +580,8 @@ func (rc *LogClient) InitClients( if err != nil { log.Fatal("failed to get stores", zap.Error(err)) } + liveStoreCount := liveTiKVStoreCount(stores) + replicaCount := rc.getMaxReplica(ctx) metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, len(stores)+1) importCli := importclient.NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) @@ -578,9 +595,10 @@ func (rc *LogClient) InitClients( if err != nil { return errors.Trace(err) } - // This poolSize is similar to full restore, as both workflows are comparable. - // The poolSize should be greater than concurrencyPerStore multiplied by the number of stores. - poolSize := concurrencyPerStore * 32 * uint(len(stores)) + // Keep the global SST restore pool large enough to avoid starving stores when + // queued file sets temporarily point to other TiKVs. + const sstRestoreWorkerPoolSizePerStore uint = 7186 + poolSize := sstRestoreWorkerPoolSizePerStore * uint(len(stores)) log.Info("sst restore worker pool", zap.Uint("size", poolSize)) sstWorkerPool := tidbutil.NewWorkerPool(poolSize, "sst file") @@ -613,7 +631,7 @@ func (rc *LogClient) InitClients( if err != nil { return errors.Trace(err) } - sstRestoreManager := &SstRestoreManager{} + sstRestoreManager := &SstRestoreManager{storeCount: liveStoreCount, replicaCount: replicaCount} if sstCheckpointMetaManager != nil { var err error sstRestoreManager.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, sstCheckpointMetaManager) @@ -632,6 +650,55 @@ func (rc *LogClient) InitClients( return nil } +func liveTiKVStoreCount(stores []*metapb.Store) uint { + var count uint + for _, store := range stores { + if store.GetState() == metapb.StoreState_Up { + count++ + } + } + return count +} + +func (rc *LogClient) getMaxReplica(ctx context.Context) uint { + if rc.pdHTTPClient == nil { + return maxReplicaFromReplicateConfig(nil, errors.New("PD HTTP client is not initialized")) + } + var resp map[string]any + var err error + err = utils.WithRetry(ctx, func() error { + resp, err = rc.pdHTTPClient.GetReplicateConfig(ctx) + return err + }, utils.NewAggressivePDBackoffStrategy()) + return maxReplicaFromReplicateConfig(resp, err) +} + +func maxReplicaFromReplicateConfig(resp map[string]any, err error) uint { + if err != nil { + log.Warn("failed to get max replicas from PD replicate config, use default value", + zap.Uint("default-max-replicas", defaultTiKVMaxReplicas), + logutil.ShortError(err)) + return defaultTiKVMaxReplicas + } + + const key = "max-replicas" + val, ok := resp[key] + if !ok { + log.Warn("max replicas not found in PD replicate config, use default value", + zap.Uint("default-max-replicas", defaultTiKVMaxReplicas), + zap.Any("replicate-config", resp)) + return defaultTiKVMaxReplicas + } + replicaCount, ok := val.(float64) + if !ok || replicaCount <= 0 { + log.Warn("invalid max replicas in PD replicate config, use default value", + zap.Uint("default-max-replicas", defaultTiKVMaxReplicas), + zap.Any("replicate-config", resp)) + return defaultTiKVMaxReplicas + } + return uint(replicaCount) +} + func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore( ctx context.Context, sstCheckpointMetaManager checkpoint.SnapshotMetaManagerT, @@ -665,28 +732,34 @@ func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore( ctx context.Context, restoreStartTS, startTS, restoredTS uint64, gcRatio string, + rocksDBMaxBackgroundJobs string, tiflashRecorder *tiflashrec.TiFlashRecorder, logCheckpointMetaManager checkpoint.LogMetaManagerT, -) (string, error) { + snapshotRestoreDataSize uint64, +) (string, string, uint64, error) { rc.useCheckpoint = true // if the checkpoint metadata exists in the external storage, the restore is not // for the first time. exists, err := logCheckpointMetaManager.ExistsCheckpointMetadata(ctx) if err != nil { - return "", errors.Trace(err) + return "", "", 0, errors.Trace(err) } if exists { // load the checkpoint since this is not the first time to restore log.Info("loading existing log restore checkpoint") meta, err := logCheckpointMetaManager.LoadCheckpointMetadata(ctx) if err != nil { - return "", errors.Trace(err) + return "", "", 0, errors.Trace(err) } - log.Info("reuse gc ratio from checkpoint metadata", zap.String("old-gc-ratio", gcRatio), - zap.String("checkpoint-gc-ratio", meta.GcRatio)) - return meta.GcRatio, nil + if meta.RocksDBMaxBackgroundJobs != "" { + rocksDBMaxBackgroundJobs = meta.RocksDBMaxBackgroundJobs + } + log.Info("reuse TiKV config from checkpoint metadata", + zap.String("gc-ratio", meta.GcRatio), + zap.String("rocksdb-max-background-jobs", rocksDBMaxBackgroundJobs)) + return meta.GcRatio, rocksDBMaxBackgroundJobs, meta.SnapshotRestoreDataSize, nil } // initialize the checkpoint metadata since it is the first time to restore. @@ -694,22 +767,25 @@ func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore( if tiflashRecorder != nil { items = tiflashRecorder.GetItems() } - log.Info("save gc ratio into checkpoint metadata", + log.Info("save TiKV config into checkpoint metadata", zap.Uint64("start-ts", startTS), zap.Uint64("restored-ts", restoredTS), zap.Uint64("rewrite-ts", rc.currentTS), - zap.String("gc-ratio", gcRatio), zap.Int("tiflash-item-count", len(items))) + zap.String("gc-ratio", gcRatio), zap.String("rocksdb-max-background-jobs", rocksDBMaxBackgroundJobs), + zap.Int("tiflash-item-count", len(items))) if err := logCheckpointMetaManager.SaveCheckpointMetadata(ctx, &checkpoint.CheckpointMetadataForLogRestore{ - UpstreamClusterID: rc.upstreamClusterID, - RestoreStartTS: restoreStartTS, - RestoredTS: restoredTS, - StartTS: startTS, - RewriteTS: rc.currentTS, - GcRatio: gcRatio, - TiFlashItems: items, + UpstreamClusterID: rc.upstreamClusterID, + RestoreStartTS: restoreStartTS, + RestoredTS: restoredTS, + StartTS: startTS, + RewriteTS: rc.currentTS, + GcRatio: gcRatio, + RocksDBMaxBackgroundJobs: rocksDBMaxBackgroundJobs, + SnapshotRestoreDataSize: snapshotRestoreDataSize, + TiFlashItems: items, }); err != nil { - return gcRatio, errors.Trace(err) + return gcRatio, rocksDBMaxBackgroundJobs, snapshotRestoreDataSize, errors.Trace(err) } - return gcRatio, nil + return gcRatio, rocksDBMaxBackgroundJobs, snapshotRestoreDataSize, nil } type LockedMigrations struct { diff --git a/br/pkg/restore/log_client/client_test.go b/br/pkg/restore/log_client/client_test.go index 621ef2baed760..2ae4af4d0fbdc 100644 --- a/br/pkg/restore/log_client/client_test.go +++ b/br/pkg/restore/log_client/client_test.go @@ -1912,6 +1912,83 @@ func TestCollectSSTFileSets(t *testing.T) { }) } +func TestEstimateCompactedSSTFlowControl(t *testing.T) { + fileSets := restore.BatchBackupFileSet{ + { + TableID: 1, + SSTFiles: []*backuppb.File{ + { + Name: "file-1", + Size_: 16 * units.MiB, + }, + { + Name: "file-2", + TotalBytes: 8 * units.MiB, + }, + }, + }, + } + + snapshotBytes, compactedSSTBytes, l6BytesPerStore, l5BytesPerStore, pendingBytes := + logclient.TEST_EstimateCompactedSSTFlowControl(fileSets, 5, 3, 120*units.MiB, 6*units.MiB) + require.Equal(t, uint64(120*units.MiB), snapshotBytes) + require.Equal(t, uint64(30*units.MiB), compactedSSTBytes) + require.Equal(t, uint64(72*units.MiB), l6BytesPerStore) + require.Equal(t, uint64(18*units.MiB), l5BytesPerStore) + require.InDelta(t, 54*float64(units.MiB), float64(pendingBytes), float64(units.KiB)) + + require.Equal(t, uint(2), logclient.TEST_LiveTiKVStoreCount([]*metapb.Store{ + {State: metapb.StoreState_Up}, + {State: metapb.StoreState_Offline}, + {State: metapb.StoreState_Up}, + {State: metapb.StoreState_Tombstone}, + })) +} + +func TestEstimatePendingCompactionBytes(t *testing.T) { + require.Equal(t, uint64(0), logclient.TEST_EstimatePendingCompactionBytes(11*units.TiB, units.TiB)) + pendingBytes := logclient.TEST_EstimatePendingCompactionBytes(units.TiB, 512*units.GiB) + expectedPendingBytes := (512*float64(units.GiB) - float64(units.TiB)/10) * 3 + require.InDelta(t, expectedPendingBytes, float64(pendingBytes), float64(units.MiB)) +} + +func TestCompactedSSTFlowControlTarget(t *testing.T) { + soft, hard := logclient.TEST_CompactedSSTFlowControlTarget( + []string{"192GiB"}, + []string{"256GiB"}, + 512*units.GiB, + ) + require.Equal(t, uint64(units.TiB), soft) + require.Equal(t, uint64(2*units.TiB), hard) + + soft, hard = logclient.TEST_CompactedSSTFlowControlTarget( + []string{"192GiB"}, + []string{"256GiB"}, + 3*units.TiB, + ) + require.Equal(t, uint64(3840*units.GiB), soft) + require.Equal(t, uint64(7680*units.GiB), hard) + + soft, hard = logclient.TEST_CompactedSSTFlowControlTarget( + []string{"4TiB"}, + []string{"9TiB"}, + 512*units.GiB, + ) + require.Equal(t, uint64(4*units.TiB), soft) + require.Equal(t, uint64(9*units.TiB), hard) + + require.False(t, logclient.TEST_AllTiKVConfigsAtLeast([]string{"4TiB", "192GiB"}, 4*units.TiB)) + require.True(t, logclient.TEST_AllTiKVConfigsAtLeast([]string{"4TiB", "5TiB"}, 4*units.TiB)) + require.Equal(t, "1TiB", logclient.TEST_FormatBytes(units.TiB)) + require.Equal(t, "1536GiB", logclient.TEST_FormatBytes(1536*units.GiB)) + + require.Equal(t, uint(5), logclient.TEST_MaxReplicaFromReplicateConfig(map[string]any{"max-replicas": float64(5)}, nil)) + require.Equal(t, uint(3), logclient.TEST_MaxReplicaFromReplicateConfig(nil, errors.New("pd unavailable"))) + require.Equal(t, uint(3), logclient.TEST_MaxReplicaFromReplicateConfig(map[string]any{}, nil)) + require.Equal(t, uint(3), logclient.TEST_MaxReplicaFromReplicateConfig(map[string]any{"max-replicas": "bad"}, nil)) + require.Equal(t, uint(3), logclient.TEST_MaxReplicaFromReplicateConfig(map[string]any{"max-replicas": float64(0)}, nil)) +} + func TestCompactedSplitStrategy(t *testing.T) { ctx := context.Background() diff --git a/br/pkg/restore/log_client/export_test.go b/br/pkg/restore/log_client/export_test.go index 1101e8be4d80c..a87d7a14cf193 100644 --- a/br/pkg/restore/log_client/export_test.go +++ b/br/pkg/restore/log_client/export_test.go @@ -21,8 +21,10 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/checkpoint" "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/domain" @@ -134,6 +136,68 @@ func TEST_CountReadableMetaKVFiles(files []*backuppb.DataFileInfo) int { return countReadableMetaKVFiles(files) } +func TEST_EstimateCompactedSSTFlowControl( + backupFileSets restore.BatchBackupFileSet, + storeCount uint, + replicaCount uint, + snapshotRestoreBytes uint64, + checkpointCompactedSSTBytes uint64, +) (uint64, uint64, uint64, uint64, uint64) { + estimate := estimateCompactedSSTFlowControl( + backupFileSets, + snapshotRestoreBytes, + checkpointCompactedSSTBytes, + storeCount, + replicaCount, + ) + return estimate.snapshotRestoreBytes, + estimate.compactedSSTBytes, + estimate.l6BytesPerStore, + estimate.l5BytesPerStore, + estimate.pendingBytes +} + +func TEST_EstimatePendingCompactionBytes(l6BytesPerStore, l5BytesPerStore uint64) uint64 { + return estimatePendingCompactionBytes(l6BytesPerStore, l5BytesPerStore) +} + +func TEST_CompactedSSTFlowControlTarget( + softConfig, hardConfig []string, + pendingBytes uint64, +) (uint64, uint64) { + originConfig := &compactedSSTFlowControlConfig{ + soft: make([]tikvConfigValue, 0, len(softConfig)), + hard: make([]tikvConfigValue, 0, len(hardConfig)), + } + for _, value := range softConfig { + originConfig.soft = append(originConfig.soft, tikvConfigValue{value: value}) + } + for _, value := range hardConfig { + originConfig.hard = append(originConfig.hard, tikvConfigValue{value: value}) + } + return compactedSSTFlowControlTarget(originConfig, pendingBytes) +} + +func TEST_MaxReplicaFromReplicateConfig(resp map[string]any, err error) uint { + return maxReplicaFromReplicateConfig(resp, err) +} + +func TEST_LiveTiKVStoreCount(stores []*metapb.Store) uint { + return liveTiKVStoreCount(stores) +} + +func TEST_AllTiKVConfigsAtLeast(values []string, target uint64) bool { + configs := make([]tikvConfigValue, 0, len(values)) + for _, value := range values { + configs = append(configs, tikvConfigValue{value: value}) + } + return allTiKVConfigsAtLeast(configs, target) +} + +func TEST_FormatBytes(bytes uint64) string { + return formatBytes(bytes) +} + type FakeStreamMetadataHelper struct { streamMetadataHelper diff --git a/br/pkg/restore/log_client/flow_control.go b/br/pkg/restore/log_client/flow_control.go new file mode 100644 index 0000000000000..6426ecb9ebbbb --- /dev/null +++ b/br/pkg/restore/log_client/flow_control.go @@ -0,0 +1,394 @@ +// Copyright 2026 PingCAP, Inc. Licensed under Apache-2.0. + +package logclient + +import ( + "context" + "math" + "strconv" + + "github.com/docker/go-units" + "github.com/pingcap/errors" + backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/restore" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +const ( + tikvSoftPendingCompactionBytesLimit = "storage.flow-control.soft-pending-compaction-bytes-limit" + tikvHardPendingCompactionBytesLimit = "storage.flow-control.hard-pending-compaction-bytes-limit" + + compactedSSTFlowControlSoftLimitFloor uint64 = units.TiB + compactedSSTFlowControlHardLimitFloor uint64 = 2 * units.TiB + compactedSSTFlowControlPendingThreshold uint64 = 100 * units.GiB + compactedSSTMaxBytesForLevelMultiplier uint64 = 10 +) + +type tikvConfigValue struct { + instance string + value string +} + +type compactedSSTFlowControlConfig struct { + soft []tikvConfigValue + hard []tikvConfigValue +} + +type compactedSSTFlowControlEstimate struct { + snapshotRestoreBytes uint64 + compactedSSTBytes uint64 + l6BytesPerStore uint64 + l5BytesPerStore uint64 + pendingBytes uint64 + storeCount uint + replicaCount uint +} + +func (rc *LogClient) adjustTiKVFlowControlForCompactedSSTRestore( + ctx context.Context, + backupFileSets restore.BatchBackupFileSet, + snapshotRestoreBytes uint64, + checkpointCompactedSSTBytes uint64, +) error { + if rc.unsafeSession == nil { + log.Warn("[Compacted SST Restore] skip adjusting TiKV flow-control configs because session is not initialized") + return nil + } + if rc.sstRestoreManager == nil || rc.sstRestoreManager.storeCount == 0 { + log.Warn("[Compacted SST Restore] skip adjusting TiKV flow-control configs because TiKV store count is not initialized") + return nil + } + if rc.sstRestoreManager.replicaCount == 0 { + log.Warn("[Compacted SST Restore] skip adjusting TiKV flow-control configs because TiKV replica count is not initialized") + return nil + } + + execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor() + originConfig, err := getCompactedSSTFlowControlConfig(ctx, execCtx) + if err != nil { + return errors.Trace(err) + } + if originConfig == nil { + log.Warn("[Compacted SST Restore] skip adjusting TiKV flow-control configs because config items are unavailable") + return nil + } + + estimate := estimateCompactedSSTFlowControl( + backupFileSets, + snapshotRestoreBytes, + checkpointCompactedSSTBytes, + rc.sstRestoreManager.storeCount, + rc.sstRestoreManager.replicaCount, + ) + + if estimate.pendingBytes <= compactedSSTFlowControlPendingThreshold { + log.Info("[Compacted SST Restore] skip adjusting TiKV flow-control configs because estimated pending compaction bytes is small", + zap.String("pending-compaction-bytes", formatBytes(estimate.pendingBytes)), + zap.String("threshold", formatBytes(compactedSSTFlowControlPendingThreshold))) + return nil + } + targetSoft, targetHard := compactedSSTFlowControlTarget(originConfig, estimate.pendingBytes) + originSoftBytes := maxTiKVConfigBytes(originConfig.soft) + originHardBytes := maxTiKVConfigBytes(originConfig.hard) + logCompactedSSTFlowControlEstimate(estimate, targetSoft, targetHard) + + if allTiKVConfigsAtLeast(originConfig.soft, targetSoft) && + allTiKVConfigsAtLeast(originConfig.hard, targetHard) { + log.Info("[Compacted SST Restore] TiKV flow-control configs are already large enough", + zap.String("soft-current", formatBytes(originSoftBytes)), + zap.String("hard-current", formatBytes(originHardBytes))) + return nil + } + + if err := setTiKVConfig(ctx, execCtx, tikvHardPendingCompactionBytesLimit, formatBytes(targetHard)); err != nil { + return errors.Trace(err) + } + if err := setTiKVConfig(ctx, execCtx, tikvSoftPendingCompactionBytesLimit, formatBytes(targetSoft)); err != nil { + return errors.Trace(err) + } + + log.Warn("[Compacted SST Restore] adjusted TiKV flow-control configs", + zap.String("soft", formatBytes(targetSoft)), + zap.String("hard", formatBytes(targetHard))) + return nil +} + +func estimateCompactedSSTFlowControl( + backupFileSets restore.BatchBackupFileSet, + snapshotRestoreBytes uint64, + checkpointCompactedSSTBytes uint64, + storeCount uint, + replicaCount uint, +) compactedSSTFlowControlEstimate { + compactedSSTBytes := checkpointCompactedSSTBytes + for _, set := range backupFileSets { + for _, file := range set.SSTFiles { + compactedSSTBytes = saturatingAddUint64(compactedSSTBytes, compactedSSTSizeForFlowControl(file)) + } + } + l6BytesPerStore := estimateLevelBytesPerStore(snapshotRestoreBytes, storeCount, replicaCount) + l5BytesPerStore := estimateLevelBytesPerStore(compactedSSTBytes, storeCount, replicaCount) + return compactedSSTFlowControlEstimate{ + snapshotRestoreBytes: snapshotRestoreBytes, + compactedSSTBytes: compactedSSTBytes, + l6BytesPerStore: l6BytesPerStore, + l5BytesPerStore: l5BytesPerStore, + pendingBytes: estimatePendingCompactionBytes(l6BytesPerStore, l5BytesPerStore), + storeCount: storeCount, + replicaCount: replicaCount, + } +} + +func compactedSSTSizeForFlowControl(file *backuppb.File) uint64 { + if file.GetSize_() > 0 { + return file.GetSize_() + } + return file.GetTotalBytes() +} + +func estimateLevelBytesPerStore(totalBytes uint64, storeCount uint, replicaCount uint) uint64 { + if storeCount == 0 || replicaCount == 0 || totalBytes == 0 { + return 0 + } + effectiveReplicaCount := replicaCount + if effectiveReplicaCount > storeCount { + effectiveReplicaCount = storeCount + } + return saturatingMulUint64( + ceilDivUint64(totalBytes, uint64(storeCount)), + uint64(effectiveReplicaCount), + ) +} + +func estimatePendingCompactionBytes(l6BytesPerStore, l5BytesPerStore uint64) uint64 { + if l5BytesPerStore == 0 { + return 0 + } + l6Bytes := float64(l6BytesPerStore) + l5Bytes := float64(l5BytesPerStore) + ratio := l6Bytes / l5Bytes + if ratio > float64(compactedSSTMaxBytesForLevelMultiplier) { + return 0 + } + l5TargetBytes := l6Bytes / float64(compactedSSTMaxBytesForLevelMultiplier) + if l5Bytes <= l5TargetBytes { + return 0 + } + pendingBytes := (l5Bytes - l5TargetBytes) * (ratio + 1) + if pendingBytes <= 0 { + return 0 + } + if pendingBytes >= float64(math.MaxUint64) { + return math.MaxUint64 + } + return uint64(pendingBytes) +} + +func compactedSSTFlowControlTarget( + originConfig *compactedSSTFlowControlConfig, + pendingBytes uint64, +) (uint64, uint64) { + adjustedPendingBytes := saturatingAddUint64( + pendingBytes, + ceilDivUint64(pendingBytes, 4), + ) + soft := max( + compactedSSTFlowControlSoftLimitFloor, + adjustedPendingBytes, + maxTiKVConfigBytes(originConfig.soft), + ) + hard := saturatingMulUint64(soft, 2) + hard = max(compactedSSTFlowControlHardLimitFloor, hard, maxTiKVConfigBytes(originConfig.hard)) + return soft, hard +} + +func getCompactedSSTFlowControlConfig( + ctx context.Context, + execCtx sqlexec.RestrictedSQLExecutor, +) (*compactedSSTFlowControlConfig, error) { + soft, err := getTiKVConfigValues(ctx, execCtx, tikvSoftPendingCompactionBytesLimit) + if err != nil { + return nil, errors.Trace(err) + } + hard, err := getTiKVConfigValues(ctx, execCtx, tikvHardPendingCompactionBytesLimit) + if err != nil { + return nil, errors.Trace(err) + } + if len(soft) == 0 || len(hard) == 0 { + return nil, nil + } + return &compactedSSTFlowControlConfig{ + soft: soft, + hard: hard, + }, nil +} + +func getTiKVConfigValues( + ctx context.Context, + execCtx sqlexec.RestrictedSQLExecutor, + name string, +) ([]tikvConfigValue, error) { + rows, fields, errSQL := execCtx.ExecRestrictedSQL( + kv.WithInternalSourceType(ctx, kv.InternalTxnBR), + nil, + "show config where name = %? and type = 'tikv'", + name, + ) + if errSQL != nil { + return nil, errSQL + } + + configs := make([]tikvConfigValue, 0, len(rows)) + for _, row := range rows { + d := row.GetDatum(1, &fields[1].Column.FieldType) + instance, err := d.ToString() + if err != nil { + return nil, errors.Trace(err) + } + d = row.GetDatum(3, &fields[3].Column.FieldType) + value, err := d.ToString() + if err != nil { + return nil, errors.Trace(err) + } + configs = append(configs, tikvConfigValue{ + instance: instance, + value: value, + }) + } + return configs, nil +} + +func setTiKVConfig( + ctx context.Context, + execCtx sqlexec.RestrictedSQLExecutor, + name string, + value string, +) error { + _, _, err := execCtx.ExecRestrictedSQL( + kv.WithInternalSourceType(ctx, kv.InternalTxnBR), + nil, + "set config tikv `"+name+"`=%?", + value, + ) + if err != nil { + return errors.Annotatef(err, "failed to set config `%s`=%s", name, value) + } + return nil +} + +func maxTiKVConfigBytes(configs []tikvConfigValue) uint64 { + maxBytes := uint64(0) + for _, config := range configs { + value, err := parseByteSizeConfig(config.value) + if err != nil { + log.Warn("[Compacted SST Restore] failed to parse TiKV flow-control config value", + zap.String("instance", config.instance), + zap.String("value", config.value), + zap.Error(err)) + continue + } + maxBytes = max(maxBytes, value) + } + return maxBytes +} + +func allTiKVConfigsAtLeast(configs []tikvConfigValue, target uint64) bool { + if len(configs) == 0 { + return false + } + for _, config := range configs { + value, err := parseByteSizeConfig(config.value) + if err != nil { + log.Warn("[Compacted SST Restore] failed to parse TiKV flow-control config value", + zap.String("instance", config.instance), + zap.String("value", config.value), + zap.Error(err)) + return false + } + if value < target { + return false + } + } + return true +} + +func parseByteSizeConfig(s string) (uint64, error) { + v, err := units.RAMInBytes(s) + if err != nil { + v, err = units.FromHumanSize(s) + } + if err != nil { + return 0, errors.Trace(err) + } + if v < 0 { + return 0, errors.Errorf("invalid negative byte size: %s", s) + } + return uint64(v), nil +} + +func formatBytes(bytes uint64) string { + for _, unit := range []struct { + bytes uint64 + suffix string + }{ + {uint64(units.TiB), "TiB"}, + {uint64(units.GiB), "GiB"}, + {uint64(units.MiB), "MiB"}, + {uint64(units.KiB), "KiB"}, + } { + if bytes >= unit.bytes && bytes%unit.bytes == 0 { + return strconv.FormatUint(bytes/unit.bytes, 10) + unit.suffix + } + } + return strconv.FormatUint(bytes, 10) + "B" +} + +func saturatingAddUint64(a, b uint64) uint64 { + if math.MaxUint64-a < b { + return math.MaxUint64 + } + return a + b +} + +func saturatingMulUint64(a uint64, b uint64) uint64 { + if b != 0 && a > math.MaxUint64/b { + return math.MaxUint64 + } + return a * b +} + +func ceilDivUint64(a, b uint64) uint64 { + if b == 0 { + return 0 + } + if a == 0 { + return 0 + } + return 1 + (a-1)/b +} + +func logCompactedSSTFlowControlEstimate( + estimate compactedSSTFlowControlEstimate, + targetSoft uint64, + targetHard uint64, +) { + log.Info("[Compacted SST Restore] estimated added bytes by TiKV store", + zap.Uint64("snapshot-restore-bytes", estimate.snapshotRestoreBytes), + zap.String("snapshot-restore-size", formatBytes(estimate.snapshotRestoreBytes)), + zap.Uint64("compacted-sst-bytes", estimate.compactedSSTBytes), + zap.String("compacted-sst-size", formatBytes(estimate.compactedSSTBytes)), + zap.Uint("store-count", estimate.storeCount), + zap.Uint("replica-count", estimate.replicaCount), + zap.Uint64("l6-bytes-per-store", estimate.l6BytesPerStore), + zap.String("l6-size-per-store", formatBytes(estimate.l6BytesPerStore)), + zap.Uint64("l5-bytes-per-store", estimate.l5BytesPerStore), + zap.String("l5-size-per-store", formatBytes(estimate.l5BytesPerStore)), + zap.Uint64("estimated-pending-compaction-bytes", estimate.pendingBytes), + zap.String("estimated-pending-compaction-size", formatBytes(estimate.pendingBytes))) + log.Info("[Compacted SST Restore] target TiKV flow-control configs", + zap.String("soft-pending-compaction-bytes-limit", formatBytes(targetSoft)), + zap.String("hard-pending-compaction-bytes-limit", formatBytes(targetHard))) +} diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index cab51e4536d9c..c234a40bd38d4 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -294,11 +294,8 @@ func (rc *SnapClient) GetCheckPrivilegeTableRowsCollateCompatibility() bool { } func (rc *SnapClient) updateConcurrency() { - // we believe 32 is large enough for download worker pool. - // it won't reach the limit if sst files distribute evenly. - // when restore memory usage is still too high, we should reduce concurrencyPerStore - // to sarifice some speed to reduce memory usage. - count := uint(rc.storeCount) * rc.concurrencyPerStore * 32 + const downloadWorkerPoolSizePerStore uint = 7186 + count := uint(rc.storeCount) * downloadWorkerPoolSizePerStore log.Info("download coarse worker pool", zap.Uint("size", count)) rc.workerPool = tidbutil.NewWorkerPool(count, "file") } diff --git a/br/pkg/streamhelper/advancer_cliext.go b/br/pkg/streamhelper/advancer_cliext.go index 087b1bb066464..73a8c61137d11 100644 --- a/br/pkg/streamhelper/advancer_cliext.go +++ b/br/pkg/streamhelper/advancer_cliext.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "strings" + "sync/atomic" "time" "github.com/golang/protobuf/proto" @@ -23,6 +24,8 @@ import ( "github.com/pingcap/tidb/pkg/util/redact" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type EventType int @@ -73,9 +76,17 @@ type AdvancerExt struct { var ( // etcd's default periodic watch progress is too sparse for failover, so request it proactively. metadataWatchProgressInterval = 30 * time.Second + metadataWatchCreateTimeouts = []time.Duration{5 * time.Second, 10 * time.Second, 15 * time.Second} + metadataRequestTimeouts = []time.Duration{5 * time.Second, 10 * time.Second, 15 * time.Second} metadataWatchIdleTimeout = 90 * time.Second ) +const ( + metadataWatchCreating int32 = iota + metadataWatchCreated + metadataWatchCreateTimedOut +) + func errorEvent(err error) TaskEvent { return TaskEvent{ Type: EventErr, @@ -108,6 +119,60 @@ func watchIdleTimeoutError(target string) error { target, metadataWatchIdleTimeout) } +func isRetryableMetadataRequestError(ctx context.Context, err error) bool { + if ctx.Err() != nil { + return false + } + if errors.Cause(err) == context.DeadlineExceeded { + return true + } + switch status.Code(err) { + case codes.DeadlineExceeded, codes.Unavailable: + return true + default: + return false + } +} + +func runMetadataRequestWithRetry[T any]( + ctx context.Context, + warnMessage string, + fields []zap.Field, + request func(context.Context) (T, error), +) (T, error) { + var ( + lastErr error + zero T + ) + for attempt, timeout := range metadataRequestTimeouts { + requestCtx, cancel := context.WithTimeout(ctx, timeout) + resp, err := request(requestCtx) + cancel() + if err == nil { + return resp, nil + } + + lastErr = err + retryable := attempt+1 < len(metadataRequestTimeouts) && + isRetryableMetadataRequestError(ctx, err) + logFields := make([]zap.Field, 0, len(fields)+7) + logFields = append(logFields, zap.String("category", "log backup advancer")) + logFields = append(logFields, fields...) + logFields = append(logFields, + zap.Int("attempt", attempt+1), + zap.Int("max-attempts", len(metadataRequestTimeouts)), + zap.Bool("retry", retryable), + zap.Duration("timeout", timeout), + logutil.ShortError(err)) + log.Warn(warnMessage, logFields...) + if retryable { + continue + } + return zero, err + } + return zero, lastErr +} + func (t AdvancerExt) toTaskEvent(ctx context.Context, event *clientv3.Event) (TaskEvent, error) { te := TaskEvent{} var prefix string @@ -171,8 +236,9 @@ func (t AdvancerExt) eventFromWatch(ctx context.Context, resp clientv3.WatchResp } func (t AdvancerExt) startListen(ctx context.Context, rev int64, ch chan<- TaskEvent) { - taskCh := t.Client.Watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) - pauseCh := t.Client.Watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev)) + watcher := t.getWatcher() + taskCh := watcher.Watch(ctx, PrefixOfTask(), clientv3.WithPrefix(), clientv3.WithRev(rev)) + pauseCh := watcher.Watch(ctx, PrefixOfPause(), clientv3.WithPrefix(), clientv3.WithRev(rev)) // inner function def handleResponse := func(resp clientv3.WatchResponse) bool { @@ -313,7 +379,19 @@ func (t MetaDataClient) WaitGlobalCheckpointAdvance(ctx context.Context, taskNam func (t MetaDataClient) getGlobalCheckpointWithRevision(ctx context.Context, taskName string) (uint64, int64, error) { key := GlobalCheckpointOf(taskName) - resp, err := t.KV.Get(ctx, key) + redactedKey := redact.Key([]byte(key)) + resp, err := runMetadataRequestWithRetry(ctx, + "failed to get global checkpoint from metadata store", + []zap.Field{ + zap.String("key", redactedKey), + zap.String("task", taskName), + }, + func(requestCtx context.Context) (*clientv3.GetResponse, error) { + failpoint.Inject("advancer_get_global_checkpoint_request_timeout", func() { + failpoint.Return(nil, context.DeadlineExceeded) + }) + return t.KV.Get(requestCtx, key) + }) if err != nil { return 0, 0, err } @@ -327,7 +405,9 @@ func (t MetaDataClient) getGlobalCheckpointWithRevision(ctx context.Context, tas if err != nil { return 0, 0, err } - return checkpoint, firstKV.ModRevision, nil + // Watch from the response revision rather than the key's ModRevision. The key + // can stay unchanged long enough for its ModRevision to be compacted. + return checkpoint, resp.Header.Revision, nil } func (t MetaDataClient) waitCheckpointEvent( @@ -336,9 +416,61 @@ func (t MetaDataClient) waitCheckpointEvent( current uint64, rev int64, ) error { - watchCtx, cancelWatch := context.WithCancel(clientv3.WithRequireLeader(ctx)) + redactedKey := redact.Key([]byte(key)) + var ( + watchCtx context.Context + cancelWatch context.CancelFunc + watcher clientv3.Watcher + watchCh clientv3.WatchChan + ) + for attempt, timeout := range metadataWatchCreateTimeouts { + watchCtx, cancelWatch = context.WithCancel(clientv3.WithRequireLeader(ctx)) + // etcd Watch may block before returning the watch channel when creating the watch stream. + watcher = t.getWatcher() + var watchCreateState atomic.Int32 + watchCreateTimer := time.AfterFunc(timeout, func() { + if !watchCreateState.CompareAndSwap(metadataWatchCreating, metadataWatchCreateTimedOut) { + return + } + log.Warn("etcd watch creation timed out, resetting metadata watcher", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + zap.Int("attempt", attempt+1), + zap.Int("max-attempts", len(metadataWatchCreateTimeouts)), + zap.Bool("retry", attempt+1 < len(metadataWatchCreateTimeouts)), + zap.Duration("timeout", timeout)) + cancelWatch() + t.resetWatcher() + }) + watchCh = watcher.Watch(watchCtx, key, clientv3.WithRev(rev), clientv3.WithProgressNotify()) + if watchCreateState.CompareAndSwap(metadataWatchCreating, metadataWatchCreated) { + watchCreateTimer.Stop() + } + if watchCreateState.Load() == metadataWatchCreateTimedOut { + log.Warn("global checkpoint watch returned after creation timeout", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + zap.Int("attempt", attempt+1), + zap.Int("max-attempts", len(metadataWatchCreateTimeouts))) + cancelWatch() + if ctx.Err() != nil { + return ctx.Err() + } + if attempt+1 < len(metadataWatchCreateTimeouts) { + continue + } + return berrors.ErrPiTRCheckpointWatchRestart.GenWithStackByArgs() + } + break + } + if watchCh == nil { + return berrors.ErrPiTRCheckpointWatchRestart.GenWithStackByArgs() + } defer cancelWatch() - watchCh := t.Watcher.Watch(watchCtx, key, clientv3.WithRev(rev), clientv3.WithProgressNotify()) progressTicker := time.NewTicker(metadataWatchProgressInterval) defer progressTicker.Stop() idleTimer := time.NewTimer(metadataWatchIdleTimeout) @@ -346,13 +478,31 @@ func (t MetaDataClient) waitCheckpointEvent( for { select { case <-ctx.Done(): + log.Info("stop waiting for global checkpoint event because context is done", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + logutil.ShortError(ctx.Err())) return ctx.Err() case resp, ok := <-watchCh: resetWatchIdleTimer(idleTimer) if !ok { + log.Warn("global checkpoint watch channel closed", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev)) return berrors.ErrPiTRCheckpointWatchRestart.GenWithStackByArgs() } if err := resp.Err(); err != nil { + log.Warn("global checkpoint watch response has error", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + zap.Int64("compact-revision", resp.CompactRevision), + logutil.ShortError(err)) if resp.CompactRevision != 0 { return berrors.ErrPiTRCheckpointWatchRestart.GenWithStackByArgs() } @@ -371,10 +521,22 @@ func (t MetaDataClient) waitCheckpointEvent( } } case <-progressTicker.C: - if err := requestWatchProgress(watchCtx, t.Watcher); err != nil { + if err := requestWatchProgress(watchCtx, watcher); err != nil { + log.Warn("failed to request global checkpoint watch progress", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + logutil.ShortError(err)) return err } case <-idleTimer.C: + log.Warn("global checkpoint watch idle timeout", + zap.String("category", "log backup advancer"), + zap.String("key", redactedKey), + zap.Uint64("current-checkpoint", current), + zap.Int64("revision", rev), + zap.Duration("timeout", metadataWatchIdleTimeout)) return watchIdleTimeoutError("global checkpoint") } } @@ -393,6 +555,7 @@ func parseGlobalCheckpointValue(value []byte) (uint64, error) { func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskName string, checkpoint uint64) error { key := GlobalCheckpointOf(taskName) value := string(encodeUint64(checkpoint)) + redactedKey := redact.Key([]byte(key)) oldValue, err := t.GetGlobalCheckpointForTask(ctx, taskName) if err != nil { return err @@ -404,7 +567,26 @@ func (t AdvancerExt) UploadV3GlobalCheckpointForTask(ctx context.Context, taskNa return nil } - _, err = t.KV.Put(ctx, key, value) + _, err = runMetadataRequestWithRetry(ctx, + "failed to upload global checkpoint to metadata store", + []zap.Field{ + zap.String("key", redactedKey), + zap.String("task", taskName), + zap.Uint64("checkpoint", checkpoint), + }, + func(requestCtx context.Context) (struct{}, error) { + failpoint.Inject("advancer_upload_global_checkpoint_request_timeout", func() { + failpoint.Return(struct{}{}, context.DeadlineExceeded) + }) + _, err = t.KV.Put(requestCtx, key, value) + if err == nil { + failpoint.Inject("advancer_upload_global_checkpoint_commit_timeout", func() { + err = context.DeadlineExceeded + }) + } + return struct{}{}, err + }, + ) if err != nil { return err } diff --git a/br/pkg/streamhelper/client.go b/br/pkg/streamhelper/client.go index 9cf387b211b47..f1911c254e154 100644 --- a/br/pkg/streamhelper/client.go +++ b/br/pkg/streamhelper/client.go @@ -11,6 +11,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -205,6 +206,24 @@ func NewMetaDataClient(c *clientv3.Client) *MetaDataClient { return &MetaDataClient{c} } +var metadataWatcherMu sync.Mutex + +func (c MetaDataClient) getWatcher() clientv3.Watcher { + metadataWatcherMu.Lock() + defer metadataWatcherMu.Unlock() + return c.Client.Watcher +} + +func (c MetaDataClient) resetWatcher() { + metadataWatcherMu.Lock() + oldWatcher := c.Client.Watcher + c.Client.Watcher = clientv3.NewWatcher(c.Client) + metadataWatcherMu.Unlock() + if oldWatcher != nil { + _ = oldWatcher.Close() + } +} + // ParseCheckpoint parses the checkpoint from a key & value pair. func ParseCheckpoint(task string, key, value []byte) (Checkpoint, error) { pfx := []byte(CheckPointsOf(task)) diff --git a/br/pkg/streamhelper/export_test.go b/br/pkg/streamhelper/export_test.go index ac6f130b030b8..af8f3f5d5fe33 100644 --- a/br/pkg/streamhelper/export_test.go +++ b/br/pkg/streamhelper/export_test.go @@ -130,3 +130,11 @@ func SetMetadataWatchProgressForTest(interval, timeout time.Duration) func() { metadataWatchIdleTimeout = oldTimeout } } + +func GetGlobalCheckpointWithRevisionForTest( + ctx context.Context, + c MetaDataClient, + taskName string, +) (uint64, int64, error) { + return c.getGlobalCheckpointWithRevision(ctx, taskName) +} diff --git a/br/pkg/streamhelper/integration_test.go b/br/pkg/streamhelper/integration_test.go index ec61ea5a0ba51..a617cbfe161b6 100644 --- a/br/pkg/streamhelper/integration_test.go +++ b/br/pkg/streamhelper/integration_test.go @@ -151,6 +151,18 @@ func TestIntegration(t *testing.T) { testCheckpointWatchProgressTimeout(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) t.Run("TestPauseTaskWithErr", func(t *testing.T) { testPauseTaskWithErr(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) }) + t.Run("TestGlobalCheckpointRevisionSurvivesCompaction", func(t *testing.T) { + testGlobalCheckpointRevisionSurvivesCompaction(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) + }) + t.Run("TestGetGlobalCheckpointRetriesTimeout", func(t *testing.T) { + testGetGlobalCheckpointRetriesTimeout(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) + }) + t.Run("TestUploadGlobalCheckpointRetriesTimeout", func(t *testing.T) { + testUploadGlobalCheckpointRetriesTimeout(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) + }) + t.Run("TestUploadGlobalCheckpointRetriesCommitTimeout", func(t *testing.T) { + testUploadGlobalCheckpointRetriesCommitTimeout(t, streamhelper.AdvancerExt{MetaDataClient: metaCli}) + }) } func TestChecking(t *testing.T) { @@ -399,6 +411,94 @@ func testStreamCheckpoint(t *testing.T, metaCli streamhelper.AdvancerExt) { req.EqualValues(0, ts) } +func testGlobalCheckpointRevisionSurvivesCompaction(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + task := "checkpoint_revision_compaction" + req := require.New(t) + + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 100)) + _, checkpointRev, err := streamhelper.GetGlobalCheckpointWithRevisionForTest(ctx, metaCli.MetaDataClient, task) + req.NoError(err) + + advanceRevisionPrefix := "/test/advance-revision/" + task + "/" + for i := range 5 { + _, err := metaCli.KV.Put(ctx, fmt.Sprintf("%s%d", advanceRevisionPrefix, i), "value") + req.NoError(err) + } + resp, err := metaCli.KV.Get(ctx, advanceRevisionPrefix, clientv3.WithPrefix(), clientv3.WithCountOnly()) + req.NoError(err) + compactedRev := resp.Header.Revision + req.Greater(compactedRev, checkpointRev) + _, err = metaCli.KV.Compact(ctx, compactedRev) + req.NoError(err) + + checkpoint, rev, err := streamhelper.GetGlobalCheckpointWithRevisionForTest(ctx, metaCli.MetaDataClient, task) + req.NoError(err) + req.EqualValues(100, checkpoint) + req.GreaterOrEqual(rev, compactedRev) +} + +func testGetGlobalCheckpointRetriesTimeout(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + task := "checkpoint_get_retry" + req := require.New(t) + + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 200)) + req.NoError(failpoint.Enable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_get_global_checkpoint_request_timeout", + "2*return(true)")) + defer func() { + req.NoError(failpoint.Disable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_get_global_checkpoint_request_timeout")) + }() + + checkpoint, err := metaCli.GetGlobalCheckpointForTask(ctx, task) + req.NoError(err) + req.EqualValues(200, checkpoint) +} + +func testUploadGlobalCheckpointRetriesTimeout(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + task := "checkpoint_upload_retry" + req := require.New(t) + + req.NoError(failpoint.Enable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_upload_global_checkpoint_request_timeout", + "2*return(true)")) + defer func() { + req.NoError(failpoint.Disable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_upload_global_checkpoint_request_timeout")) + }() + + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 300)) + checkpoint, err := metaCli.GetGlobalCheckpointForTask(ctx, task) + req.NoError(err) + req.EqualValues(300, checkpoint) +} + +func testUploadGlobalCheckpointRetriesCommitTimeout(t *testing.T, metaCli streamhelper.AdvancerExt) { + ctx := context.Background() + task := "checkpoint_upload_commit_retry" + req := require.New(t) + + req.NoError(failpoint.Enable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_upload_global_checkpoint_commit_timeout", + "1*return(true)")) + defer func() { + req.NoError(failpoint.Disable( + "github.com/pingcap/tidb/br/pkg/streamhelper/advancer_upload_global_checkpoint_commit_timeout")) + }() + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 400)) + + checkpoint, err := metaCli.GetGlobalCheckpointForTask(ctx, task) + req.NoError(err) + req.EqualValues(400, checkpoint) + req.NoError(metaCli.UploadV3GlobalCheckpointForTask(ctx, task, 350)) + checkpoint, err = metaCli.GetGlobalCheckpointForTask(ctx, task) + req.NoError(err) + req.EqualValues(400, checkpoint) +} + func testStoptask(t *testing.T, metaCli streamhelper.AdvancerExt) { var ( ctx = context.Background() diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index e57fa42fd6002..9a778c3d4fc30 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -194,7 +194,7 @@ func DefineRestoreCommonFlags(flags *pflag.FlagSet) { // TODO remove experimental tag if it's stable flags.Bool(flagOnline, false, "(experimental) Whether online when restore") flags.String(flagGranularity, string(restore.CoarseGrained), "(deprecated) Whether split & scatter regions using fine-grained way during restore") - flags.Uint(flagConcurrencyPerStore, 128, "The size of thread pool on each store that executes tasks") + flags.Uint(flagConcurrencyPerStore, conn.DefaultImportNumGoroutines, "The size of thread pool on each store that executes tasks") flags.Uint32(flagConcurrency, 128, "(deprecated) The size of thread pool on BR that executes tasks, "+ "where each task restores one SST file to TiKV") flags.Uint64(FlagMergeRegionSizeBytes, conn.DefaultMergeRegionSizeBytes, @@ -325,6 +325,8 @@ type RestoreConfig struct { RestoreRegistry *registry.Registry `json:"-" toml:"-"` WaitTiflashReady bool `json:"wait-tiflash-ready" toml:"wait-tiflash-ready"` + // snapshotRestoreDataSize records the filtered snapshot files restored before PITR log restore. + snapshotRestoreDataSize uint64 // PITR-related fields for blocklist creation // RestoreStartTS is the timestamp when the restore operation began (before any table creation). @@ -1586,6 +1588,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s setTablesRestoreModeIfNeeded(tables, cfg, isPiTR, client.IsIncremental()) archiveSize := metautil.ArchiveTablesSize(tables) + cfg.snapshotRestoreDataSize = archiveSize // some more checks once we get tables and files information if err := checkOptionalClusterRequirements(ctx, client, cfg, cpEnabledAndExists, mgr, tables, archiveSize, isPiTR); err != nil { return errors.Trace(err) @@ -1952,6 +1955,26 @@ func checkPreallocIDReusable(ctx context.Context, cfg *SnapshotRestoreConfig, ha return checkpointMeta.PreallocIDs, nil } +func adjustRestoreConcurrencyPerStoreFromTiKV(ctx context.Context, mgr *conn.Mgr, cfg *RestoreConfig) { + kvConfigs := &pconfig.KVConfig{ + ImportGoroutines: cfg.ConcurrencyPerStore, + MergeRegionSize: pconfig.ConfigTerm[uint64]{ + Modified: true, + }, + MergeRegionKeyCount: pconfig.ConfigTerm[uint64]{ + Modified: true, + }, + } + httpCli := httputil.NewClient(mgr.GetTLSConfig()) + mgr.ProcessTiKVConfigs(ctx, kvConfigs, httpCli) + cfg.ConcurrencyPerStore = markRestoreConcurrencyPerStoreAdjusted(kvConfigs.ImportGoroutines) +} + +func markRestoreConcurrencyPerStoreAdjusted(concurrency pconfig.ConfigTerm[uint]) pconfig.ConfigTerm[uint] { + concurrency.Modified = true + return concurrency +} + func getMaxReplica(ctx context.Context, mgr *conn.Mgr) (cnt uint64, err error) { var resp map[string]any err = utils.WithRetry(ctx, func() error { diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index ff2dc9f2193dd..85809eca74788 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -511,11 +511,11 @@ func (s *streamMgr) checkStreamStartEnable(ctx context.Context) error { return nil } -type RestoreGCFunc func(string) error +type RestoreConfigFunc func(string) error // DisableGC disables and returns a function that can enable gc back. // gc.ratio-threshold = "-1.0", which represents disable gc in TiKV. -func DisableGC(g glue.Glue, store kv.Storage) (RestoreGCFunc, string, error) { +func DisableGC(g glue.Glue, store kv.Storage) (RestoreConfigFunc, string, error) { se, err := g.CreateSession(store) if err != nil { return nil, "", errors.Trace(err) @@ -537,6 +537,38 @@ func DisableGC(g glue.Glue, store kv.Storage) (RestoreGCFunc, string, error) { }, oldRatio, nil } +// KeepRocksDBMaxBackgroundJobsLow limits RocksDB compaction jobs during restore and returns a restore function. +func KeepRocksDBMaxBackgroundJobsLow(g glue.Glue, store kv.Storage) (RestoreConfigFunc, string, error) { + se, err := g.CreateSession(store) + if err != nil { + return nil, "", errors.Trace(err) + } + + execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor() + oldJobs, err := utils.GetRocksDBMaxBackgroundJobs(execCtx) + if err != nil { + return nil, "", errors.Trace(err) + } + if oldJobs == "" { + log.Warn("skip setting rocksdb.max-background-jobs because config item is unavailable") + return func(string) error { + return nil + }, oldJobs, nil + } + + err = utils.SetRocksDBMaxBackgroundJobs(execCtx, utils.RocksDBMaxBackgroundJobsForRestore) + if err != nil { + return nil, "", errors.Trace(err) + } + + return func(jobs string) error { + if jobs == "" { + return nil + } + return utils.SetRocksDBMaxBackgroundJobs(execCtx, jobs) + }, oldJobs, nil +} + // RunStreamCommand run all kinds of `stream task` func RunStreamCommand( ctx context.Context, @@ -1419,6 +1451,9 @@ func RunStreamRestore( cfg.adjustRestoreConfigForStreamRestore() cfg.tiflashRecorder = tiflashrec.New() + // createLogClient consumes ConcurrencyPerStore while initializing SST clients, + // so adjust it from TiKV before constructing the log client. + adjustRestoreConcurrencyPerStoreFromTiKV(ctx, mgr, cfg) logClient, err := createLogClient(ctx, g, cfg, mgr) if err != nil { return errors.Trace(err) @@ -1596,19 +1631,23 @@ func restoreStream( // It need disable GC in TiKV when PiTR. // because the process of PITR is concurrent and kv events isn't sorted by tso. - var restoreGCFunc RestoreGCFunc - var oldGCRatio string + var restoreGCFunc, restoreRocksDBMaxBackgroundJobsFunc RestoreConfigFunc + var oldGCRatio, oldRocksDBMaxBackgroundJobs string if err := cfg.RestoreRegistry.OperationAfterWaitIDs(ctx, func() (err error) { - restoreGCFunc, oldGCRatio, err = DisableGC(g, mgr.GetStorage()) + if restoreGCFunc, oldGCRatio, err = DisableGC(g, mgr.GetStorage()); err != nil { + return errors.Trace(err) + } + restoreRocksDBMaxBackgroundJobsFunc, oldRocksDBMaxBackgroundJobs, err = KeepRocksDBMaxBackgroundJobsLow(g, mgr.GetStorage()) return errors.Trace(err) }); err != nil { return errors.Trace(err) } - gcDisabledRestorable := false + tikvConfigRestorable := false + tikvConfigCheckpointPersisted := !cfg.UseCheckpoint defer func() { // don't restore the gc-ratio-threshold if checkpoint mode is used and restored is not finished - if cfg.UseCheckpoint && !gcDisabledRestorable { - log.Info("skip restore the gc-ratio-threshold for next retry") + if cfg.UseCheckpoint && !tikvConfigRestorable && tikvConfigCheckpointPersisted { + log.Info("skip restore the tikv config for next retry") return } @@ -1618,12 +1657,18 @@ func restoreStream( log.Warn("the original gc-ratio is negative, reset by default value 1.1", zap.String("old-gc-ratio", oldGCRatio)) oldGCRatio = utils.DefaultGcRatioVal } - log.Info("start to restore gc", zap.String("ratio", oldGCRatio)) + log.Info("start to restore tikv config", + zap.String("gc-ratio", oldGCRatio), + zap.String("max-background-jobs", oldRocksDBMaxBackgroundJobs)) err = cfg.RestoreRegistry.GlobalOperationAfterSetResettingStatus(ctx, cfg.RestoreID, func() error { if err := restoreGCFunc(oldGCRatio); err != nil { log.Error("failed to restore gc", zap.Error(err)) return errors.Trace(err) } + if err := restoreRocksDBMaxBackgroundJobsFunc(oldRocksDBMaxBackgroundJobs); err != nil { + log.Error("failed to restore rocksdb.max-background-jobs", zap.Error(err)) + return errors.Trace(err) + } return nil }) log.Info("finish restoring gc") @@ -1631,12 +1676,17 @@ func restoreStream( var sstCheckpointSets map[string]struct{} if cfg.UseCheckpoint { - gcRatioFromCheckpoint, err := client.LoadOrCreateCheckpointMetadataForLogRestore( - ctx, cfg.RestoreStartTS, cfg.StartTS, cfg.RestoreTS, oldGCRatio, cfg.tiflashRecorder, cfg.logCheckpointMetaManager) + gcRatioFromCheckpoint, oldRocksDBMaxBackgroundJobsFromCheckpoint, snapshotRestoreDataSize, err := client.LoadOrCreateCheckpointMetadataForLogRestore( + ctx, cfg.RestoreStartTS, cfg.StartTS, cfg.RestoreTS, oldGCRatio, oldRocksDBMaxBackgroundJobs, cfg.tiflashRecorder, cfg.logCheckpointMetaManager, cfg.snapshotRestoreDataSize) if err != nil { return errors.Trace(err) } + tikvConfigCheckpointPersisted = true oldGCRatio = gcRatioFromCheckpoint + oldRocksDBMaxBackgroundJobs = oldRocksDBMaxBackgroundJobsFromCheckpoint + if snapshotRestoreDataSize > 0 { + cfg.snapshotRestoreDataSize = snapshotRestoreDataSize + } sstCheckpointSets, err = client.InitCheckpointMetadataForCompactedSstRestore(ctx, cfg.sstCheckpointMetaManager) if err != nil { return errors.Trace(err) @@ -1770,6 +1820,7 @@ func restoreStream( sstsIter := iter.ConcatAll(addedSSTsIter, compactionIter) var checkpointSSTProgress int64 + var checkpointSSTSize uint64 updateSSTStatsWithCheckpoint := func(kvCount, size uint64) { mu.Lock() defer mu.Unlock() @@ -1778,6 +1829,7 @@ func restoreStream( checkpointTotalKVCount += kvCount checkpointTotalSize += size checkpointSSTProgress += int64(kvCount) + checkpointSSTSize += size } compactedSplitIter, err := client.WrapCompactedFilesIterWithSplitHelper( ctx, sstsIter, rewriteRules, sstCheckpointSets, @@ -1822,7 +1874,14 @@ func restoreStream( p.IncBy(int64(kvCount)) } - err = client.RestoreSSTFileSets(ctx, sstFileSets, importModeSwitcher, p.IncBy) + err = client.RestoreSSTFileSets( + ctx, + sstFileSets, + importModeSwitcher, + cfg.snapshotRestoreDataSize, + checkpointSSTSize, + p.IncBy, + ) if err != nil { return errors.Trace(err) } @@ -1936,7 +1995,7 @@ func restoreStream( } }) - gcDisabledRestorable = true + tikvConfigRestorable = true return nil } diff --git a/br/pkg/task/stream_test.go b/br/pkg/task/stream_test.go index a32e9cba0ee64..ccd7394ca4161 100644 --- a/br/pkg/task/stream_test.go +++ b/br/pkg/task/stream_test.go @@ -88,6 +88,16 @@ func TestShouldOpenPiTRAddIndexSQLStorage(t *testing.T) { } } +func TestMarkRestoreConcurrencyPerStoreAdjusted(t *testing.T) { + cfg := RestoreConfig{} + cfg.ConcurrencyPerStore.Value = 132 + + cfg.ConcurrencyPerStore = markRestoreConcurrencyPerStoreAdjusted(cfg.ConcurrencyPerStore) + + require.Equal(t, uint(132), cfg.ConcurrencyPerStore.Value) + require.True(t, cfg.ConcurrencyPerStore.Modified) +} + func TestCheckLogRange(t *testing.T) { cases := []struct { restoreFrom uint64 diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index b68298c75f70f..075397a27df06 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -99,7 +99,7 @@ go_test( ], embed = [":utils"], flaky = True, - shard_count = 38, + shard_count = 39, deps = [ "//br/pkg/errors", "//pkg/kv", diff --git a/br/pkg/utils/db.go b/br/pkg/utils/db.go index cc51b899b3f97..ea6f2b0924154 100644 --- a/br/pkg/utils/db.go +++ b/br/pkg/utils/db.go @@ -113,6 +113,8 @@ func GetGcRatio(ctx sqlexec.RestrictedSQLExecutor) (string, error) { const ( DefaultGcRatioVal = "1.1" DisabledGcRatioVal = "-1.0" + + RocksDBMaxBackgroundJobsForRestore = "1" ) func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error { @@ -129,6 +131,38 @@ func SetGcRatio(ctx sqlexec.RestrictedSQLExecutor, ratio string) error { return nil } +func GetRocksDBMaxBackgroundJobs(ctx sqlexec.RestrictedSQLExecutor) (string, error) { + valStr := "show config where name = 'rocksdb.max-background-jobs' and type = 'tikv'" + rows, fields, errSQL := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + valStr, + ) + if errSQL != nil { + return "", errSQL + } + if len(rows) == 0 { + return "", nil + } + + d := rows[0].GetDatum(3, &fields[3].Column.FieldType) + return d.ToString() +} + +func SetRocksDBMaxBackgroundJobs(ctx sqlexec.RestrictedSQLExecutor, jobs string) error { + _, _, err := ctx.ExecRestrictedSQL( + kv.WithInternalSourceType(context.Background(), kv.InternalTxnBR), + nil, + "set config tikv `rocksdb.max-background-jobs`=%?", + jobs, + ) + if err != nil { + return errors.Annotatef(err, "failed to set config `rocksdb.max-background-jobs`=%s", jobs) + } + log.Warn("set config tikv rocksdb.max-background-jobs", zap.String("jobs", jobs)) + return nil +} + // LogBackupTaskCountInc increases the count of log backup task. func LogBackupTaskCountInc() { logBackupTaskCount.Inc() diff --git a/br/pkg/utils/db_test.go b/br/pkg/utils/db_test.go index bca73fde48f17..b7e509f300904 100644 --- a/br/pkg/utils/db_test.go +++ b/br/pkg/utils/db_test.go @@ -40,7 +40,8 @@ func (m *mockRestrictedSQLExecutor) ExecRestrictedSQL(ctx context.Context, opts if strings.Contains(sql, "show config") { return m.rows, m.fields, nil - } else if strings.Contains(sql, "set config") && strings.Contains(sql, "gc.ratio-threshold") { + } else if strings.Contains(sql, "set config") && + (strings.Contains(sql, "gc.ratio-threshold") || strings.Contains(sql, "rocksdb.max-background-jobs")) { value := args[0].(string) for _, r := range m.rows { @@ -100,6 +101,38 @@ func TestGc(t *testing.T) { require.Equal(t, ratio, "-1.0") } +func TestRocksDBMaxBackgroundJobs(t *testing.T) { + fields := make([]*resolve.ResultField, 4) + tps := []*types.FieldType{ + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + types.NewFieldType(mysql.TypeString), + } + for i := 0; i < len(tps); i++ { + rf := new(resolve.ResultField) + rf.Column = new(model.ColumnInfo) + rf.Column.FieldType = *tps[i] + fields[i] = rf + } + rows := make([]chunk.Row, 0, 2) + row := chunk.MutRowFromValues("tikv", " 127.0.0.1:20161", "rocksdb.max-background-jobs", "8").ToRow() + rows = append(rows, row) + row = chunk.MutRowFromValues("tikv", " 127.0.0.1:20162", "rocksdb.max-background-jobs", "8").ToRow() + rows = append(rows, row) + + s := &mockRestrictedSQLExecutor{rows: rows, fields: fields} + jobs, err := utils.GetRocksDBMaxBackgroundJobs(s) + require.NoError(t, err) + require.Equal(t, "8", jobs) + + err = utils.SetRocksDBMaxBackgroundJobs(s, utils.RocksDBMaxBackgroundJobsForRestore) + require.NoError(t, err) + jobs, err = utils.GetRocksDBMaxBackgroundJobs(s) + require.NoError(t, err) + require.Equal(t, utils.RocksDBMaxBackgroundJobsForRestore, jobs) +} + func TestRegionSplitInfo(t *testing.T) { // config format: // MySQL [(none)]> show config where name = 'coprocessor.region-split-size';