Skip to content
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ The folder contents are:
config # (GHA only) matrix JSON with ci_node_index entries
```

DDTest chooses parallelism by estimating the runnable duration of each test file,
then trying worker counts between `--min-parallelism` and `--max-parallelism`.
Duration estimates come from Datadog test suite p50 timings when available and
fall back to local discovery weights otherwise. DDTest chooses the split with
the lowest slowest-worker time, then the most even load, so it avoids launching
workers that would sit idle or would not shorten the run.

DDTest may also write compatibility files at legacy root paths for existing
integrations. New integrations should read runner files from `.testoptimization/runner/*`.
You can use `runner/test-files.txt` or `runner/tests-split/runner-X` files to feed
Expand Down Expand Up @@ -152,20 +159,37 @@ In CI-node mode, DDTest uses one local worker by default so database and other p

DDTest automatically sets `DD_TEST_SESSION_NAME` for each worker to `<DD_SERVICE>-node-<nodeIndex>-worker-<workerIndex>` when the variable is not already set. If you set `DD_TEST_SESSION_NAME` yourself, DDTest preserves it and expands the same `{{nodeIndex}}` and `{{workerIndex}}` placeholders before starting each worker.

### Reports

DDTest prints a human-readable report to stderr after `ddtest plan` and `ddtest run`.
The plan report summarizes the run identity, Datadog feature settings, backend
data, planning quality, and selected split. `Test impact collection` is shown
from Datadog's `code_coverage` setting. The run report summarizes the worker,
file count, duration, and process result.

Reports are aggregate-only: DDTest does not print per-test or per-file lists,
and counts may show `disabled` or `not available` when a Datadog feature is off
or its backend payload is not present. To turn reports off:

```bash
DD_TEST_OPTIMIZATION_RUNNER_REPORT_ENABLED=false ddtest plan
```

### Settings (flags and environment variables)

| CLI flag | Environment variable | Default | What it does |
| ------------------- | --------------------------------------------- | ---------: | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--platform` | `DD_TEST_OPTIMIZATION_RUNNER_PLATFORM` | `ruby` | Language/platform (currently supported values: `ruby`). |
| `--framework` | `DD_TEST_OPTIMIZATION_RUNNER_FRAMEWORK` | `rspec` | Test framework (currently supported values: `rspec`, `minitest`). |
| `--min-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MIN_PARALLELISM` | physical CPU count | Minimum workers to use for the split. |
| `--max-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MAX_PARALLELISM` | physical CPU count | Maximum workers to use for the split. |
| `--min-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MIN_PARALLELISM` | physical CPU count | Minimum worker count DDTest considers when choosing the split. |
| `--max-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MAX_PARALLELISM` | physical CPU count | Maximum worker count DDTest considers when choosing the split. |
| | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE` | `-1` (off) | Restrict this run to the slice assigned to node **N** (0-indexed). |
| `--ci-node-workers` | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE_WORKERS` | `1` | Number of parallel workers per CI node. Use a positive integer, or `ncpu` to use the node's available physical CPU cores. Tests assigned to a CI node are further split among this many local workers. |
| `--worker-env` | `DD_TEST_OPTIMIZATION_RUNNER_WORKER_ENV` | `""` | Template env vars per worker: `--worker-env "DATABASE_NAME_TEST=app_test{{nodeIndex}}_{{workerIndex}}"`. `{{nodeIndex}}` is the machine number (`0` for single-machine runs); `{{workerIndex}}` is the process number within that machine. |
| `--command` | `DD_TEST_OPTIMIZATION_RUNNER_COMMAND` | `""` | Override the default test command used by the framework. When provided, takes precedence over auto-detection (e.g., `--command "bundle exec custom-rspec"`). |
| `--tests-location` | `DD_TEST_OPTIMIZATION_RUNNER_TESTS_LOCATION` | `""` | Custom glob pattern to discover test files (e.g., `--tests-location "custom/spec/**/*_spec.rb"`). Defaults to `spec/**/*_spec.rb` for RSpec, `test/**/*_test.rb` for Minitest. |
| `--runtime-tags` | `DD_TEST_OPTIMIZATION_RUNNER_RUNTIME_TAGS` | `""` | JSON string to override runtime tags used to fetch skippable tests. Useful for local development on a different OS than CI (e.g., `--runtime-tags '{"os.platform":"linux","runtime.version":"3.2.0"}'`). |
| | `DD_TEST_OPTIMIZATION_RUNNER_REPORT_ENABLED` | `true` | Print human-readable plan and run reports. Set to `false` to disable them. |

#### Note about the `--command` flag

Expand Down
66 changes: 50 additions & 16 deletions internal/runner/ci_node_executor.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,84 @@
package runner

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/DataDog/ddtest/internal/framework"
"golang.org/x/sync/errgroup"
)

// runCINodeTests executes tests for a specific CI node (one split, not the whole tests set).
// runCINode executes tests for a specific CI node (one split, not the whole tests set).
// It further splits the node's tests among local workers based on ci_node_workers setting.
func runCINodeTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int, ciNodeWorkers int, testFileWeights map[string]int) error {
runnerFilePath := runnerSplitPath(ciNode)
if _, err := os.Stat(runnerFilePath); os.IsNotExist(err) {
return fmt.Errorf("runner file for ci-node %d does not exist: %s", ciNode, runnerFilePath)
} else if err != nil {
return fmt.Errorf("failed to check runner file for ci-node %d at %s: %w", ciNode, runnerFilePath, err)
func (e testExecutor) runCINode(ciNode int, ciNodeWorkers int, testFileWeights map[string]int) runExecutionResult {
report := newCINodeExecutionReport(ciNode, ciNodeWorkers)
testFiles, err := loadCINodeTestFiles(ciNode)
if err != nil {
return report.failure(err)
}
report.TestFilesRun = len(testFiles)

if report.LocalWorkers <= 1 {
err = e.runCINodeSingleWorker(ciNode, testFiles)
} else {
err = e.runCINodeWorkers(ciNode, report.LocalWorkers, testFiles, testFileWeights)
}
if err != nil {
return report.failure(err)
}
return report.success()
}

// Single worker mode: run all tests with nodeIndex matching ciNode.
if ciNodeWorkers <= 1 {
slog.Info("Running tests for CI node in single-worker mode", "ciNode", ciNode, "nodeIndex", ciNode, "workerIndex", 0)
return runTestBatchFromFile(ctx, framework, runnerFilePath, workerEnvMap, ciNode, 0)
func newCINodeExecutionReport(ciNode int, ciNodeWorkers int) runExecutionReport {
if ciNodeWorkers <= 0 {
ciNodeWorkers = 1
}

return runExecutionReport{
Mode: runModeCINode,
CINode: ciNode,
LocalWorkers: ciNodeWorkers,
}
}

func loadCINodeTestFiles(ciNode int) ([]string, error) {
runnerFilePath := runnerSplitPath(ciNode)
testFiles, err := loadTestBatch(runnerFilePath)
if os.IsNotExist(err) {
return nil, fmt.Errorf("runner file for ci-node %d does not exist: %s", ciNode, runnerFilePath)
}
if err != nil {
return fmt.Errorf("failed to read test files for ci-node %d from %s: %w", ciNode, runnerFilePath, err)
return nil, fmt.Errorf("failed to read test files for ci-node %d from %s: %w", ciNode, runnerFilePath, err)
}
return testFiles, nil
}

func (e testExecutor) runCINodeSingleWorker(ciNode int, testFiles []string) error {
slog.Info("Running tests for CI node in single-worker mode", "ciNode", ciNode, "nodeIndex", ciNode, "workerIndex", 0)
if len(testFiles) == 0 {
slog.Info("No tests to run", "nodeIndex", ciNode, "workerIndex", 0)
return nil
}
return e.runBatch(testFiles, ciNode, 0)
}

func (e testExecutor) runCINodeWorkers(ciNode int, ciNodeWorkers int, testFiles []string, testFileWeights map[string]int) error {
if len(testFiles) == 0 {
slog.Info("No tests to run for CI node", "ciNode", ciNode)
return nil
}

// Multi-worker mode: split tests among local workers.
slog.Info("Running tests for CI node in parallel mode",
"ciNode", ciNode, "ciNodeWorkers", ciNodeWorkers, "testFilesCount", len(testFiles))

if testFileWeights == nil {
testFileWeights = map[string]int{}
}
groups := subsplitTestsBetweenWorkers(testFiles, ciNodeWorkers, testFileWeights)
return e.runCINodeWorkerGroups(ciNode, groups)
}

func (e testExecutor) runCINodeWorkerGroups(ciNode int, groups [][]string) error {
var g errgroup.Group
for workerIndex, groupFiles := range groups {
if len(groupFiles) == 0 {
Expand All @@ -57,7 +91,7 @@ func runCINodeTests(ctx context.Context, framework framework.Framework, workerEn
"testFiles", groupFiles)

g.Go(func() error {
return runTestBatch(ctx, framework, groupFiles, workerEnvMap, ciNode, workerIndex)
return e.runBatch(groupFiles, ciNode, workerIndex)
})
}

Expand Down
61 changes: 40 additions & 21 deletions internal/runner/ci_node_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/DataDog/ddtest/internal/constants"
)

func TestRunCINodeTests_SingleWorker(t *testing.T) {
func TestRunCINode_SingleWorker(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -27,9 +27,13 @@ func TestRunCINodeTests_SingleWorker(t *testing.T) {
}

// Test with single worker (ciNodeWorkers=1)
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 1, 1, nil)
result := newTestExecutor(context.Background(), mockFramework, map[string]string{}).runCINode(1, 1, nil)
report, err := result.report, result.err
if err != nil {
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
t.Fatalf("runCINode() should not return error, got: %v", err)
}
if report.TestFilesRun != 2 {
t.Errorf("Expected report to count 2 test files, got %d", report.TestFilesRun)
}

// Verify RunTests was called exactly once
Expand All @@ -45,7 +49,7 @@ func TestRunCINodeTests_SingleWorker(t *testing.T) {
}
}

func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
func TestRunCINode_MultipleWorkers(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -63,9 +67,16 @@ func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
}

// Test with 2 workers on ci-node 1
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 1, 2, nil)
result := newTestExecutor(context.Background(), mockFramework, map[string]string{}).runCINode(1, 2, nil)
report, err := result.report, result.err
if err != nil {
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
t.Fatalf("runCINode() should not return error, got: %v", err)
}
if report.LocalWorkers != 2 {
t.Errorf("Expected report to count 2 local workers, got %d", report.LocalWorkers)
}
if report.TestFilesRun != 4 {
t.Errorf("Expected report to count 4 test files, got %d", report.TestFilesRun)
}

// Verify RunTests was called twice (once per worker)
Expand Down Expand Up @@ -102,7 +113,7 @@ func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
}
}

func TestRunCINodeTests_NodeIndexMatchesCINode(t *testing.T) {
func TestRunCINode_NodeIndexMatchesCINode(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -124,9 +135,10 @@ func TestRunCINodeTests_NodeIndexMatchesCINode(t *testing.T) {
}

// Test with 2 workers on ci-node 1
err := runCINodeTests(context.Background(), mockFramework, workerEnvMap, 1, 2, nil)
result := newTestExecutor(context.Background(), mockFramework, workerEnvMap).runCINode(1, 2, nil)
err := result.err
if err != nil {
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
t.Fatalf("runCINode() should not return error, got: %v", err)
}

// Verify RunTests was called twice
Expand Down Expand Up @@ -156,7 +168,7 @@ func TestRunCINodeTests_NodeIndexMatchesCINode(t *testing.T) {
}
}

func TestRunCINodeTests_SingleWorkerNodeIndex(t *testing.T) {
func TestRunCINode_SingleWorkerNodeIndex(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -177,9 +189,10 @@ func TestRunCINodeTests_SingleWorkerNodeIndex(t *testing.T) {
"WORKER_INDEX": "{{workerIndex}}",
}

err := runCINodeTests(context.Background(), mockFramework, workerEnvMap, 2, 1, nil)
result := newTestExecutor(context.Background(), mockFramework, workerEnvMap).runCINode(2, 1, nil)
err := result.err
if err != nil {
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
t.Fatalf("runCINode() should not return error, got: %v", err)
}

calls := mockFramework.GetRunTestsCalls()
Expand All @@ -195,7 +208,7 @@ func TestRunCINodeTests_SingleWorkerNodeIndex(t *testing.T) {
}
}

func TestRunCINodeTests_FileNotFound(t *testing.T) {
func TestRunCINode_FileNotFound(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -206,9 +219,10 @@ func TestRunCINodeTests_FileNotFound(t *testing.T) {

mockFramework := &MockFramework{FrameworkName: "rspec"}

err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 2, 1, nil)
result := newTestExecutor(context.Background(), mockFramework, map[string]string{}).runCINode(2, 1, nil)
err := result.err
if err == nil {
t.Error("runCINodeTests() should return error when runner file doesn't exist")
t.Error("runCINode() should return error when runner file doesn't exist")
}

expectedMsg := "runner file for ci-node 2 does not exist"
Expand All @@ -217,7 +231,7 @@ func TestRunCINodeTests_FileNotFound(t *testing.T) {
}
}

func TestRunCINodeTests_DoesNotReadLegacySplitFile(t *testing.T) {
func TestRunCINode_DoesNotReadLegacySplitFile(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -228,9 +242,10 @@ func TestRunCINodeTests_DoesNotReadLegacySplitFile(t *testing.T) {

mockFramework := &MockFramework{FrameworkName: "rspec"}

err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 2, 1, nil)
result := newTestExecutor(context.Background(), mockFramework, map[string]string{}).runCINode(2, 1, nil)
err := result.err
if err == nil {
t.Error("runCINodeTests() should return error when only the legacy runner file exists")
t.Error("runCINode() should return error when only the legacy runner file exists")
}

expectedMsg := "runner file for ci-node 2 does not exist"
Expand All @@ -239,7 +254,7 @@ func TestRunCINodeTests_DoesNotReadLegacySplitFile(t *testing.T) {
}
}

func TestRunCINodeTests_EmptyFile(t *testing.T) {
func TestRunCINode_EmptyFile(t *testing.T) {
tempDir := t.TempDir()
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
Expand All @@ -252,9 +267,13 @@ func TestRunCINodeTests_EmptyFile(t *testing.T) {
mockFramework := &MockFramework{FrameworkName: "rspec"}

// Should not error for empty file, just not run any tests
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 0, 2, nil)
result := newTestExecutor(context.Background(), mockFramework, map[string]string{}).runCINode(0, 2, nil)
report, err := result.report, result.err
if err != nil {
t.Fatalf("runCINodeTests() should not return error for empty file, got: %v", err)
t.Fatalf("runCINode() should not return error for empty file, got: %v", err)
}
if report.TestFilesRun != 0 {
t.Errorf("Expected report to count 0 test files, got %d", report.TestFilesRun)
}

// Verify no tests were run
Expand Down
Loading
Loading