Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions pkg/dxf/framework/integrationtests/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

var (
maxConcurrentTask = flag.Int("max-concurrent-task", proto.MaxConcurrentTask, "max concurrent task")
maxConcurrentTask = flag.Int("max-concurrent-task", proto.GetMaxConcurrentTask(), "max concurrent task")
waitDuration = flag.Duration("task-wait-duration", 2*time.Minute, "task wait duration")
schedulerInterval = flag.Duration("scheduler-interval", scheduler.CheckTaskFinishedInterval, "scheduler interval")
taskExecutorMgrInterval = flag.Duration("task-executor-mgr-interval", taskexecutor.TaskCheckInterval, "task executor mgr interval")
Expand All @@ -65,43 +65,43 @@ func BenchmarkSchedulerOverhead(b *testing.B) {
}()
schIntervalBak := scheduler.CheckTaskFinishedInterval
exeMgrIntervalBak := taskexecutor.TaskCheckInterval
bak := proto.MaxConcurrentTask
restoreMaxConcurrentTask := proto.SetMaxConcurrentTaskForTest(*maxConcurrentTask)
b.Cleanup(func() {
proto.MaxConcurrentTask = bak
restoreMaxConcurrentTask()
scheduler.CheckTaskFinishedInterval = schIntervalBak
taskexecutor.TaskCheckInterval = exeMgrIntervalBak
})
proto.MaxConcurrentTask = *maxConcurrentTask
scheduler.CheckTaskFinishedInterval = *schedulerInterval
taskexecutor.TaskCheckInterval = *taskExecutorMgrInterval

b.Logf("max concurrent task: %d", proto.MaxConcurrentTask)
maxConcurrentTaskValue := proto.GetMaxConcurrentTask()
b.Logf("max concurrent task: %d", maxConcurrentTaskValue)
b.Logf("taks wait duration: %s", *waitDuration)
b.Logf("task meta size: %d", *taskMetaSize)
b.Logf("scheduler interval: %s", scheduler.CheckTaskFinishedInterval)
b.Logf("task executor mgr interval: %s", taskexecutor.TaskCheckInterval)

prepareForBenchTest(b)
c := testutil.NewTestDXFContext(b, 1, 2*proto.MaxConcurrentTask, false)
c := testutil.NewTestDXFContext(b, 1, 2*maxConcurrentTaskValue, false)

registerTaskTypeForBench(c)

if *noTask {
time.Sleep(*waitDuration)
} else {
// in this test, we will start 4*proto.MaxConcurrentTask tasks, but only
// proto.MaxConcurrentTask will be scheduled at the same time, for other
// in this test, we will start 4*maxConcurrentTaskValue tasks, but only
// maxConcurrentTaskValue will be scheduled at the same time, for other
// tasks will be in queue only to check the performance of querying them.
for i := range 4 * proto.MaxConcurrentTask {
for i := range 4 * maxConcurrentTaskValue {
taskKey := fmt.Sprintf("task-%03d", i)
taskMeta := make([]byte, *taskMetaSize)
_, err := handle.SubmitTask(c.Ctx, taskKey, proto.TaskTypeExample, c.Store.GetKeyspace(), 1, "", 0, taskMeta)
require.NoError(c.T, err)
}
// task has 2 steps, each step has 1 subtask,wait in serial to reduce WaitTask check overhead.
// only wait first proto.MaxConcurrentTask and exit
// only wait first maxConcurrentTaskValue and exit
time.Sleep(2 * *waitDuration)
for i := range proto.MaxConcurrentTask {
for i := range maxConcurrentTaskValue {
taskKey := fmt.Sprintf("task-%03d", i)
testutil.WaitTaskDoneOrPaused(c.Ctx, c.T, taskKey)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/proto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ go_test(
],
embed = [":proto"],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = ["@com_github_stretchr_testify//require"],
)
49 changes: 46 additions & 3 deletions pkg/dxf/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package proto
import (
"cmp"
"fmt"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -83,9 +84,51 @@ const (
NormalPriority = 512
)

// MaxConcurrentTask is the max concurrency of task.
// TODO: remove this limit later.
var MaxConcurrentTask = 16
const (
Comment thread
D3Hunter marked this conversation as resolved.
// maxConcurrentTaskLowerBound is the minimum allowed DXF task concurrency.
maxConcurrentTaskLowerBound = 16
// MaxConcurrentTaskUpperBound is the current safety cap for DXF task concurrency.
// TODO: remove this cap after the DXF scheduler no longer runs all schedulers on the owner node.
MaxConcurrentTaskUpperBound = 1000
// DefaultMaxConcurrentTask is the default DXF task concurrency.
DefaultMaxConcurrentTask = maxConcurrentTaskLowerBound
)

// maxConcurrentTask is an owner-local emergency tuning knob for DXF scheduling.
// It is intentionally kept in memory only: it is not persisted to TiKV, is reset
// on restart, and only affects the TiDB node that receives the update. Operators
// should change it through the DXF owner node when many small tasks are blocked
// by the default limit. Raising it increases scheduler overhead and memory usage
// on the owner, so the owner node may need a larger resource spec first.
var maxConcurrentTask atomic.Int64

func init() {
maxConcurrentTask.Store(DefaultMaxConcurrentTask)
}

// GetMaxConcurrentTask returns the max concurrency of task.
func GetMaxConcurrentTask() int {
return int(maxConcurrentTask.Load())
}

// SetMaxConcurrentTask updates the max concurrency of task.
func SetMaxConcurrentTask(value int) error {
if value < maxConcurrentTaskLowerBound || value > MaxConcurrentTaskUpperBound {
return fmt.Errorf("max_concurrent_task %d is out of range [%d, %d]",
value, maxConcurrentTaskLowerBound, MaxConcurrentTaskUpperBound)
}
maxConcurrentTask.Store(int64(value))
return nil
}

// SetMaxConcurrentTaskForTest updates the max concurrency of task and returns a restore function.
func SetMaxConcurrentTaskForTest(value int) func() {
Comment thread
D3Hunter marked this conversation as resolved.
old := GetMaxConcurrentTask()
maxConcurrentTask.Store(int64(value))
return func() {
maxConcurrentTask.Store(int64(old))
}
}

// ExtraParams is the extra params of task.
// Note: only store params that's not used for filter or sort in this struct.
Expand Down
17 changes: 17 additions & 0 deletions pkg/dxf/framework/proto/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@ func TestTaskIsDone(t *testing.T) {
}
}

func TestMaxConcurrentTask(t *testing.T) {
restore := SetMaxConcurrentTaskForTest(DefaultMaxConcurrentTask)
defer restore()

require.Equal(t, DefaultMaxConcurrentTask, GetMaxConcurrentTask())
require.Equal(t, 1000, MaxConcurrentTaskUpperBound)
for _, value := range []int{maxConcurrentTaskLowerBound - 1, MaxConcurrentTaskUpperBound + 1} {
require.Error(t, SetMaxConcurrentTask(value))
require.Equal(t, DefaultMaxConcurrentTask, GetMaxConcurrentTask())
}

require.NoError(t, SetMaxConcurrentTask(128))
require.Equal(t, 128, GetMaxConcurrentTask())
require.NoError(t, SetMaxConcurrentTask(MaxConcurrentTaskUpperBound))
require.Equal(t, MaxConcurrentTaskUpperBound, GetMaxConcurrentTask())
}

func TestTaskCompare(t *testing.T) {
taskA := Task{TaskBase: TaskBase{
ID: 100,
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// TaskManager defines the interface to access task table.
type TaskManager interface {
// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
// GetTopUnfinishedTasks returns unfinished tasks, limited by GetMaxConcurrentTask()*2,
// to make sure low ranking tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)
Expand Down
12 changes: 8 additions & 4 deletions pkg/dxf/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ func NewManager(ctx context.Context, store kv.Storage, taskMgr TaskManager, serv
slotMgr: slotMgr,
serverID: serverID,
}),
logger: logger,
finishCh: make(chan struct{}, proto.MaxConcurrentTask),
logger: logger,
// finishCh must be able to buffer finish signals for the largest runtime
// value of maxConcurrentTask. Otherwise, raising the limit after startup
// can make non-blocking sends drop signals until the periodic cleanup loop runs.
finishCh: make(chan struct{}, proto.MaxConcurrentTaskUpperBound),
nodeRes: nodeRes,
}
schedulerManager.mu.schedulerMap = make(map[int64]Scheduler)
Expand Down Expand Up @@ -244,7 +247,8 @@ func (sm *Manager) getSchedulableTasks(ctx context.Context) ([]*proto.TaskBase,
defer r.End()
getTasksFn := sm.taskMgr.GetTopUnfinishedTasks
taskCnt := sm.getSchedulerCount()
if taskCnt >= proto.MaxConcurrentTask {
maxConcurrentTask := proto.GetMaxConcurrentTask()
if taskCnt >= maxConcurrentTask {
// when we have reached the limit of concurrent tasks, we only handle
// tasks in states that don't need resources, e.g. reverting/cancelling/
// pausing/modifying.
Expand Down Expand Up @@ -291,7 +295,7 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
switch task.State {
case proto.TaskStatePending, proto.TaskStateRunning, proto.TaskStateResuming:
taskCnt := sm.getSchedulerCount()
if taskCnt >= proto.MaxConcurrentTask {
if taskCnt >= proto.GetMaxConcurrentTask() {
continue
}
reservedExecID, ok = sm.slotMgr.canReserve(task)
Expand Down
6 changes: 1 addition & 5 deletions pkg/dxf/framework/scheduler/scheduler_manager_nokit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,7 @@ func TestStartSchedulerCrossKeyspaceRuntime(t *testing.T) {
}

func TestFastRespondNoNeedResourceTaskWhenSchedulersReachLimit(t *testing.T) {
bak := proto.MaxConcurrentTask
t.Cleanup(func() {
proto.MaxConcurrentTask = bak
})
proto.MaxConcurrentTask = 1
t.Cleanup(proto.SetMaxConcurrentTaskForTest(1))

ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
10 changes: 3 additions & 7 deletions pkg/dxf/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/MockDisableDistTask", "return(true)")
// test scheduleTaskLoop
// test parallelism control
var originalConcurrency int
restoreMaxConcurrentTask := func() {}
if taskCnt == 1 {
originalConcurrency = proto.MaxConcurrentTask
proto.MaxConcurrentTask = 1
restoreMaxConcurrentTask = proto.SetMaxConcurrentTaskForTest(1)
}

store := testkit.CreateMockStore(t)
Expand All @@ -190,10 +189,7 @@ func checkSchedule(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
sch.Start()
defer func() {
sch.Stop()
// make data race happy
if taskCnt == 1 {
proto.MaxConcurrentTask = originalConcurrency
}
restoreMaxConcurrentTask()
}()

// 3s
Expand Down
15 changes: 7 additions & 8 deletions pkg/dxf/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,11 +485,7 @@ func TestSwitchTaskStepInBatch(t *testing.T) {
func TestGetTopUnfinishedTasks(t *testing.T) {
_, gm, ctx := testutil.InitTableTest(t)

bak := proto.MaxConcurrentTask
t.Cleanup(func() {
proto.MaxConcurrentTask = bak
})
proto.MaxConcurrentTask = 4
t.Cleanup(proto.SetMaxConcurrentTaskForTest(4))
require.NoError(t, gm.InitMeta(ctx, ":4000", ""))
taskStates := []proto.TaskState{
proto.TaskStateSucceed,
Expand Down Expand Up @@ -554,21 +550,24 @@ func TestGetTopUnfinishedTasks(t *testing.T) {
require.Len(t, tasks, 8)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9"}, getTaskKeys(tasks))

proto.MaxConcurrentTask = 6
restoreMaxConcurrentTask := proto.SetMaxConcurrentTaskForTest(6)
tasks, err = gm.GetTopUnfinishedTasks(ctx)
require.NoError(t, err)
require.Len(t, tasks, 11)
require.Equal(t, []string{"key/6", "key/5", "key/1", "key/2", "key/3", "key/4", "key/8", "key/9", "key/10", "key/11", "key/12"}, getTaskKeys(tasks))
restoreMaxConcurrentTask()

proto.MaxConcurrentTask = 3
restoreMaxConcurrentTask = proto.SetMaxConcurrentTaskForTest(3)
tasks, err = gm.GetTopNoNeedResourceTasks(ctx)
require.NoError(t, err)
require.Equal(t, []string{"key/5", "key/3", "key/4", "key/12"}, getTaskKeys(tasks))
restoreMaxConcurrentTask()

proto.MaxConcurrentTask = 1
restoreMaxConcurrentTask = proto.SetMaxConcurrentTaskForTest(1)
tasks, err = gm.GetTopNoNeedResourceTasks(ctx)
require.NoError(t, err)
require.Equal(t, []string{"key/5", "key/3"}, getTaskKeys(tasks))
restoreMaxConcurrentTask()
}

func TestGetUsedSlotsOnNodesAndBusyNodes(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/dxf/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func (mgr *TaskManager) getTopTasks(ctx context.Context, states ...proto.TaskSta
for _, s := range states {
args = append(args, s)
}
args = append(args, proto.MaxConcurrentTask*2)
args = append(args, proto.GetMaxConcurrentTask()*2)
rs, err := mgr.ExecuteSQLWithNewSession(ctx, sql, args...)
if err != nil {
return nil, err
Expand Down
49 changes: 49 additions & 0 deletions pkg/server/handler/tests/dxf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,55 @@ func TestDXFAPI(t *testing.T) {
require.EqualValues(t, 2, out.PerKeyspace["ks1"])
})

t.Run("max concurrent task api", func(t *testing.T) {
restore := proto.SetMaxConcurrentTaskForTest(proto.DefaultMaxConcurrentTask)
defer restore()
const maxConcurrentTaskPath = "/dxf/schedule/max_concurrent_task"
checkMaxConcurrentTaskOutput := func(body []byte, expected int) {
out := struct {
MaxConcurrentTask int `json:"max_concurrent_task"`
Persistence string `json:"persistence"`
}{}
require.NoError(t, json.Unmarshal(body, &out))
require.Equal(t, expected, out.MaxConcurrentTask)
require.Equal(t, "memory_only", out.Persistence)
require.NotContains(t, string(body), "scope")
}

runAndCheckReqFn(t, http.StatusBadRequest, "This api only support GET and POST method", func() (*http.Response, error) {
req, err := http.NewRequest(http.MethodDelete, ts.StatusURL(maxConcurrentTaskPath), nil)
require.NoError(t, err)
return http.DefaultClient.Do(req)
})
for _, c := range [][2]string{
{maxConcurrentTaskPath, "invalid value "},
{maxConcurrentTaskPath + "?value=aa", "invalid value "},
{maxConcurrentTaskPath + "?value=15", "out of range"},
{fmt.Sprintf("%s?value=%d", maxConcurrentTaskPath, proto.MaxConcurrentTaskUpperBound+1), "out of range"},
} {
path, errMsg := c[0], c[1]
runAndCheckReqFn(t, http.StatusBadRequest, errMsg, func() (*http.Response, error) {
return ts.PostStatus(path, "", bytes.NewBuffer([]byte("")))
})
}

body := runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
return ts.FetchStatus(maxConcurrentTaskPath)
})
checkMaxConcurrentTaskOutput(body, proto.DefaultMaxConcurrentTask)

body = runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
return ts.PostStatus(maxConcurrentTaskPath+"?value=128", "", bytes.NewBuffer([]byte("")))
})
checkMaxConcurrentTaskOutput(body, 128)
require.Equal(t, 128, proto.GetMaxConcurrentTask())

body = runAndCheckReqFn(t, http.StatusOK, "", func() (*http.Response, error) {
return ts.FetchStatus(maxConcurrentTaskPath)
})
checkMaxConcurrentTaskOutput(body, 128)
})

t.Run("task history api", func(t *testing.T) {
seedHistoryTasks := func(t *testing.T) []int64 {
t.Helper()
Expand Down
42 changes: 42 additions & 0 deletions pkg/server/handler/tikvhandler/dxf.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,48 @@ func (h *DXFScheduleTuneHandler) ServeHTTP(w http.ResponseWriter, req *http.Requ
}
}

// DXFTaskMaxConcurrentHandler handles the in-memory DXF task concurrency limit.
type DXFTaskMaxConcurrentHandler struct{}

// NewDXFTaskMaxConcurrentHandler creates a new DXFTaskMaxConcurrentHandler.
func NewDXFTaskMaxConcurrentHandler() *DXFTaskMaxConcurrentHandler {
return &DXFTaskMaxConcurrentHandler{}
}

// ServeHTTP implements http.Handler interface.
//
// The configured value is local to the TiDB process that handles the request
// and is kept in memory only. Send the request to the current DXF owner when
// tuning scheduler concurrency.
func (*DXFTaskMaxConcurrentHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
Comment thread
D3Hunter marked this conversation as resolved.
switch req.Method {
case http.MethodGet:
writeMaxConcurrentTask(w)
case http.MethodPost:
valueStr := req.FormValue("value")
value, err := strconv.Atoi(valueStr)
if err != nil {
handler.WriteError(w, errors.Errorf("invalid value %s, error %v", valueStr, err))
return
}
if err := proto.SetMaxConcurrentTask(value); err != nil {
Comment thread
D3Hunter marked this conversation as resolved.
handler.WriteError(w, err)
return
}
logutil.BgLogger().Info("set in-memory DXF max concurrent task", zap.Int("maxConcurrentTask", value))
writeMaxConcurrentTask(w)
default:
handler.WriteError(w, errors.Errorf("This api only support GET and POST method"))
}
}

func writeMaxConcurrentTask(w http.ResponseWriter) {
handler.WriteData(w, map[string]any{
"max_concurrent_task": proto.GetMaxConcurrentTask(),
"persistence": "memory_only",
})
}

// DXFTaskMaxRuntimeSlotsHandler handles changing max runtime slots of DXF task.
type DXFTaskMaxRuntimeSlotsHandler struct{}

Expand Down
Loading
Loading