Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/dxf/framework/integrationtests/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

var (
maxConcurrentTask = flag.Int("max-concurrent-task", proto.MaxConcurrentTask, "max concurrent task")
maxConcurrentTask = flag.Int("max-concurrent-task", proto.GetMaxConcurrentTask(), "max concurrent task")
waitDuration = flag.Duration("task-wait-duration", 2*time.Minute, "task wait duration")
schedulerInterval = flag.Duration("scheduler-interval", scheduler.CheckTaskFinishedInterval, "scheduler interval")
taskExecutorMgrInterval = flag.Duration("task-executor-mgr-interval", taskexecutor.TaskCheckInterval, "task executor mgr interval")
Expand All @@ -65,43 +65,43 @@ func BenchmarkSchedulerOverhead(b *testing.B) {
}()
schIntervalBak := scheduler.CheckTaskFinishedInterval
exeMgrIntervalBak := taskexecutor.TaskCheckInterval
bak := proto.MaxConcurrentTask
restoreMaxConcurrentTask := proto.SetMaxConcurrentTaskForTest(*maxConcurrentTask)
b.Cleanup(func() {
proto.MaxConcurrentTask = bak
restoreMaxConcurrentTask()
scheduler.CheckTaskFinishedInterval = schIntervalBak
taskexecutor.TaskCheckInterval = exeMgrIntervalBak
})
proto.MaxConcurrentTask = *maxConcurrentTask
scheduler.CheckTaskFinishedInterval = *schedulerInterval
taskexecutor.TaskCheckInterval = *taskExecutorMgrInterval

b.Logf("max concurrent task: %d", proto.MaxConcurrentTask)
maxConcurrentTaskValue := proto.GetMaxConcurrentTask()
b.Logf("max concurrent task: %d", maxConcurrentTaskValue)
b.Logf("taks wait duration: %s", *waitDuration)
b.Logf("task meta size: %d", *taskMetaSize)
b.Logf("scheduler interval: %s", scheduler.CheckTaskFinishedInterval)
b.Logf("task executor mgr interval: %s", taskexecutor.TaskCheckInterval)

prepareForBenchTest(b)
c := testutil.NewTestDXFContext(b, 1, 2*proto.MaxConcurrentTask, false)
c := testutil.NewTestDXFContext(b, 1, 2*maxConcurrentTaskValue, false)

registerTaskTypeForBench(c)

if *noTask {
time.Sleep(*waitDuration)
} else {
// in this test, we will start 4*proto.MaxConcurrentTask tasks, but only
// proto.MaxConcurrentTask will be scheduled at the same time, for other
// in this test, we will start 4*maxConcurrentTaskValue tasks, but only
// maxConcurrentTaskValue will be scheduled at the same time, for other
// tasks will be in queue only to check the performance of querying them.
for i := range 4 * proto.MaxConcurrentTask {
for i := range 4 * maxConcurrentTaskValue {
taskKey := fmt.Sprintf("task-%03d", i)
taskMeta := make([]byte, *taskMetaSize)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, c.Store.GetKeyspace(), 1, "", 0, taskMeta)
require.NoError(c.T, err)
}
// task has 2 steps, each step has 1 subtask,wait in serial to reduce WaitTask check overhead.
// only wait first proto.MaxConcurrentTask and exit
// only wait first maxConcurrentTaskValue and exit
time.Sleep(2 * *waitDuration)
for i := range proto.MaxConcurrentTask {
for i := range maxConcurrentTaskValue {
taskKey := fmt.Sprintf("task-%03d", i)
testutil.WaitTaskDoneOrPaused(c.Ctx, c.T, taskKey)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ go_test(
],
embed = [":proto"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = ["@com_github_stretchr_testify//require"],
)
42 changes: 39 additions & 3 deletions pkg/dxf/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proto
import (
"cmp"
"fmt"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -63,9 +64,44 @@ const (
NormalPriority = 512
)

// MaxConcurrentTask is the max concurrency of task.
// TODO: remove this limit later.
var MaxConcurrentTask = 16
const (
// DefaultMaxConcurrentTask is the default max concurrency of task.
DefaultMaxConcurrentTask = 16
// MinMaxConcurrentTask is the minimum allowed max concurrency of task.
MinMaxConcurrentTask = 16
// MaxMaxConcurrentTask is the maximum allowed max concurrency of task.
MaxMaxConcurrentTask = 1000
)

var maxConcurrentTask atomic.Int64

func init() {
maxConcurrentTask.Store(DefaultMaxConcurrentTask)
}

// GetMaxConcurrentTask returns the max concurrency of task.
func GetMaxConcurrentTask() int {
return int(maxConcurrentTask.Load())
}

// SetMaxConcurrentTask updates the max concurrency of task.
func SetMaxConcurrentTask(value int) error {
if value < MinMaxConcurrentTask || value > MaxMaxConcurrentTask {
return fmt.Errorf("max_concurrent_task %d is out of range [%d, %d]",
value, MinMaxConcurrentTask, MaxMaxConcurrentTask)
}
maxConcurrentTask.Store(int64(value))
return nil
}

// SetMaxConcurrentTaskForTest updates the max concurrency of task and returns a restore function.
func SetMaxConcurrentTaskForTest(value int) func() {
old := GetMaxConcurrentTask()
maxConcurrentTask.Store(int64(value))
return func() {
maxConcurrentTask.Store(int64(old))
}
}

// ExtraParams is the extra params of task.
// Note: only store params that's not used for filter or sort in this struct.
Expand Down
17 changes: 17 additions & 0 deletions pkg/dxf/framework/proto/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ func TestTaskIsDone(t *testing.T) {
}
}

func TestMaxConcurrentTask(t *testing.T) {
restore := SetMaxConcurrentTaskForTest(DefaultMaxConcurrentTask)
defer restore()

require.Equal(t, DefaultMaxConcurrentTask, GetMaxConcurrentTask())
require.Equal(t, 1000, MaxMaxConcurrentTask)
for _, value := range []int{MinMaxConcurrentTask - 1, MaxMaxConcurrentTask + 1} {
require.Error(t, SetMaxConcurrentTask(value))
require.Equal(t, DefaultMaxConcurrentTask, GetMaxConcurrentTask())
}

require.NoError(t, SetMaxConcurrentTask(128))
require.Equal(t, 128, GetMaxConcurrentTask())
require.NoError(t, SetMaxConcurrentTask(MaxMaxConcurrentTask))
require.Equal(t, MaxMaxConcurrentTask, GetMaxConcurrentTask())
}

func TestTaskCompare(t *testing.T) {
taskA := Task{TaskBase: TaskBase{
ID: 100,
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ go_test(
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 43,
shard_count = 45,
deps = [
"//pkg/config",
"//pkg/config/kerneltype",
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// TaskManager defines the interface to access task table.
type TaskManager interface {
// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
// GetTopUnfinishedTasks returns unfinished tasks, limited by GetMaxConcurrentTask()*2,
// to make sure low ranking tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)
Expand Down
13 changes: 12 additions & 1 deletion pkg/dxf/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func (s *BaseScheduler) switch2NextStep() error {
// OnNextSubtasksBatch may use len(eligibleNodes) as a hint to
// calculate the number of subtasks, so we need to do this before
// filtering nodes by available slots in scheduleSubtask.
eligibleNodes = eligibleNodes[:task.MaxNodeCount]
eligibleNodes = s.randomSelectNodes(eligibleNodes, task.MaxNodeCount)
}

s.logger.Info("eligible instances", zap.Int("num", len(eligibleNodes)))
Expand All @@ -527,6 +527,17 @@ func (s *BaseScheduler) switch2NextStep() error {
return nil
}

func (s *BaseScheduler) randomSelectNodes(nodes []string, maxNodeCount int) []string {
if maxNodeCount <= 0 || len(nodes) <= maxNodeCount {
return nodes
}
selectedNodes := append([]string(nil), nodes...)
s.rand.Shuffle(len(selectedNodes), func(i, j int) {
selectedNodes[i], selectedNodes[j] = selectedNodes[j], selectedNodes[i]
})
return selectedNodes[:maxNodeCount]
}

func (s *BaseScheduler) scheduleSubTask(
task *proto.Task,
subtaskStep proto.Step,
Expand Down
87 changes: 71 additions & 16 deletions pkg/dxf/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ var (
defaultCollectMetricsInterval = 15 * time.Second
)

const maxCleanupTaskBatchSize = 100

Comment on lines +54 to +55

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

📐 Maintainability & Code Quality | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify maxCleanupTaskBatchSize is not used in production code, and find the real enforcement point.
rg -nP '\bmaxCleanupTaskBatchSize\b' --type=go
rg -nP 'limit\s+100' --type=go pkg/dxf/framework/storage

Repository: pingcap/tidb

Length of output: 150


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Locate relevant files and inspect the surrounding code that defines and uses the cleanup batch size.
git ls-files 'pkg/dxf/framework/**' | rg 'scheduler_manager\.go|task_table\.go|scheduler_manager_nokit_test\.go|proto|interface\.go'

echo '--- scheduler_manager.go ---'
sed -n '1,260p' pkg/dxf/framework/scheduler/scheduler_manager.go

echo '--- task_table.go ---'
sed -n '1,260p' pkg/dxf/framework/storage/task_table.go

echo '--- scheduler_manager_nokit_test.go ---'
sed -n '1,220p' pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.go

Repository: pingcap/tidb

Length of output: 24775


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Find every reference to the cleanup batch-size constant and the SQL limit in the framework.
rg -n 'maxCleanupTaskBatchSize|limit 100|GetTasksInStates|cleanupFinishedTasks|doCleanupTask' pkg/dxf/framework

Repository: pingcap/tidb

Length of output: 5745


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Inspect any surrounding declarations if the batch size constant lives in a nearby package.
rg -n 'Max.*Cleanup|Cleanup.*Batch|BatchSize|GetMaxConcurrentTask|MaxConcurrentTask' pkg/dxf/framework

Repository: pingcap/tidb

Length of output: 5420


🏁 Script executed:

#!/bin/bash
set -euo pipefail

sed -n '425,470p' pkg/dxf/framework/scheduler/scheduler_manager.go
sed -n '434,452p' pkg/dxf/framework/storage/task_table.go

Repository: pingcap/tidb

Length of output: 2263


Deduplicate the cleanup batch size constant.

maxCleanupTaskBatchSize is only used by scheduler_manager_nokit_test.go; the actual cleanup limit is still the hardcoded limit 100 in pkg/dxf/framework/storage/task_table.go. That leaves two independent copies of the same value. Move the batch size to a shared constant and use it in both places.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/dxf/framework/scheduler/scheduler_manager.go` around lines 54 - 55, The
cleanup batch size is duplicated between maxCleanupTaskBatchSize and the
hardcoded limit in task_table.go, so there are two independent sources of truth.
Move the value into a shared constant that can be referenced by both
scheduler_manager_nokit_test.go and the cleanup logic in task_table.go, then
replace both literals with that shared symbol to keep the limit consistent.

type batchCleanUpRoutine interface {
CleanUpBatch(ctx context.Context, tasks []*proto.Task) error
}

func (sm *Manager) getSchedulerCount() int {
sm.mu.RLock()
defer sm.mu.RUnlock()
Expand Down Expand Up @@ -155,7 +161,7 @@ func NewManager(ctx context.Context, store kv.Storage, taskMgr TaskManager, serv
serverID: serverID,
}),
logger: logger,
finishCh: make(chan struct{}, proto.MaxConcurrentTask),
finishCh: make(chan struct{}, proto.MaxMaxConcurrentTask),
nodeRes: nodeRes,
}
schedulerManager.mu.schedulerMap = make(map[int64]Scheduler)
Expand Down Expand Up @@ -244,7 +250,8 @@ func (sm *Manager) getSchedulableTasks(ctx context.Context) ([]*proto.TaskBase,
defer r.End()
getTasksFn := sm.taskMgr.GetTopUnfinishedTasks
taskCnt := sm.getSchedulerCount()
if taskCnt >= proto.MaxConcurrentTask {
maxConcurrentTask := proto.GetMaxConcurrentTask()
if taskCnt >= maxConcurrentTask {
// when we have reached the limit of concurrent tasks, we only handle
// tasks in states that don't need resources, e.g. reverting/cancelling/
// pausing/modifying.
Expand Down Expand Up @@ -291,7 +298,7 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
switch task.State {
case proto.TaskStatePending, proto.TaskStateRunning, proto.TaskStateResuming:
taskCnt := sm.getSchedulerCount()
if taskCnt >= proto.MaxConcurrentTask {
if taskCnt >= proto.GetMaxConcurrentTask() {
continue
}
reservedExecID, ok = sm.slotMgr.canReserve(task)
Expand Down Expand Up @@ -449,23 +456,36 @@ func (sm *Manager) doCleanupTask() {
}

func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
cleanedTasks := make([]*proto.Task, 0)
cleanedTasks := make([]*proto.Task, 0, len(tasks))
var firstErr error
importIntoTasks := make([]*proto.Task, 0)
cleanUpImportIntoTasks := func() error {
if len(importIntoTasks) == 0 {
return nil
}
cleanedImportIntoTasks, err := sm.cleanupImportIntoTasks(importIntoTasks)
cleanedTasks = append(cleanedTasks, cleanedImportIntoTasks...)
importIntoTasks = importIntoTasks[:0]
return err
}
for _, task := range tasks {
sm.logger.Info("cleanup task", zap.Int64("task-id", task.ID), zap.String("task-key", task.Key))
cleanupFactory := getSchedulerCleanUpFactory(task.Type)
if cleanupFactory != nil {
cleanup := cleanupFactory()
err := cleanup.CleanUp(sm.ctx, task)
if err != nil {
firstErr = err
break
}
cleanedTasks = append(cleanedTasks, task)
} else {
// if task doesn't register cleanup function, mark it as cleaned.
cleanedTasks = append(cleanedTasks, task)
if task.Type == proto.ImportInto {
importIntoTasks = append(importIntoTasks, task)
continue
}
if err := cleanUpImportIntoTasks(); err != nil {
firstErr = err
break
}
if err := sm.cleanupSingleTask(task); err != nil {
firstErr = err
break
}
cleanedTasks = append(cleanedTasks, task)
}
if firstErr == nil {
firstErr = cleanUpImportIntoTasks()
}
if firstErr != nil {
// normally ScheduleEventCounter requires a task ID, but since scheduler
Expand All @@ -482,6 +502,41 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
return sm.taskMgr.TransferTasks2History(sm.ctx, cleanedTasks)
}

func (sm *Manager) cleanupImportIntoTasks(tasks []*proto.Task) ([]*proto.Task, error) {
cleanupFactory := getSchedulerCleanUpFactory(proto.ImportInto)
if cleanupFactory == nil {
// if task doesn't register cleanup function, mark it as cleaned.
return tasks, nil
}
cleanup := cleanupFactory()
if batchCleanup, ok := cleanup.(batchCleanUpRoutine); ok {
if err := batchCleanup.CleanUpBatch(sm.ctx, tasks); err != nil {
return nil, err
}
return tasks, nil
}

cleanedTasks := make([]*proto.Task, 0, len(tasks))
for i, task := range tasks {
if i > 0 {
cleanup = cleanupFactory()
}
if err := cleanup.CleanUp(sm.ctx, task); err != nil {
return cleanedTasks, err
}
cleanedTasks = append(cleanedTasks, task)
}
return cleanedTasks, nil
}

func (sm *Manager) cleanupSingleTask(task *proto.Task) error {
cleanupFactory := getSchedulerCleanUpFactory(task.Type)
if cleanupFactory == nil {
return nil
}
return cleanupFactory().CleanUp(sm.ctx, task)
}

func (sm *Manager) collectLoop() {
sm.logger.Info("collect loop start")
ticker := time.NewTicker(defaultCollectMetricsInterval)
Expand Down
Loading