Skip to content

Commit 8c707e0

Browse files
authored
Harden job logging and timeout cleanup (#476)
## Summary This PR hardens two failure paths that show up under resource pressure: 1. Per-job log files could be missing even though the job appeared in the TUI. 2. Jobs could appear to run well past `job_timeout_minutes` because the worker context timed out but the agent subprocess/pipe handling did not always unwind promptly. The primary goal here is reliability, not changing policy. The existing default 30 minute timeout remains in place; this change makes that timeout and per-job logging more dependable. ## What can go wrong today ### Scenario 1: log file creation fails when the job starts Per-job logging was best-effort. We attempted to open `~/.roborev/logs/jobs/<id>.log` once at job start. If that failed because of a transient filesystem or resource problem, roborev logged a warning and permanently disabled disk logging for that job. On a resource-constrained machine, that can happen if: - the machine is under RAM pressure and filesystem operations stall or fail transiently - the data directory is temporarily unavailable or in a bad state - writes start failing mid-job and never recover into a reopened log file The visible result is a job in the TUI with no corresponding job log file, even though the job itself may finish. ### Scenario 2: the job timeout fires, but the review does not actually unwind The worker already applies a hard timeout via `context.WithTimeout(...)`, defaulting to 30 minutes. The bug is that some agent subprocess paths could still remain stuck in process wait / pipe handling after the context deadline. That means a user can observe a review job apparently "running" for longer than the configured timeout, especially when subprocesses are slow to terminate or inherited pipes remain open. On low-resource machines this is more likely because: - child processes can stall while under memory pressure - cleanup can take longer - stdout pipe / background process behavior can keep `Wait()` from returning promptly ## What this PR changes ### 1. Retryable job log writer Replace the old one-shot `safeWriter` behavior with a retrying `jobLogWriter`: - it keeps trying to open/reopen the per-job log file after transient failures - it buffers a bounded amount of output in memory while disk logging is unavailable - it flushes buffered content once logging becomes available again - if the buffer overflows, it records how many bytes were dropped instead of silently disabling logging forever This means a transient log-file open/write failure no longer dooms the entire job to have no disk log. ### 2. Centralized subprocess wait configuration and timeout cleanup Add shared subprocess helpers so all agent adapters use the same wait-delay handling. For streaming adapters, close the stdout pipe when the job context is done. This helps break cases where the context deadline has fired but the reader / wait path is still blocked behind lingering pipe state. After `cmd.Wait()` completes, return `context.DeadlineExceeded` when the job context has expired instead of surfacing a generic subprocess error. ### 3. Explicit timeout handling in the worker When an agent returns a deadline error, the worker now records a stable job error like: `agent timeout after 30m0s` and sends it through the normal retry/failover path. This makes timeout failures clearer in the database, TUI, and hooks, and avoids jobs looking like ambiguous generic agent failures. ## Why this fixes the reported behavior The reported symptoms were: - not every displayed job had a log file - some review jobs seemed to hang for over an hour - the machine had limited RAM This PR directly addresses those failure modes: - transient resource issues no longer permanently disable per-job disk logging - timed-out agent jobs are much more likely to unwind promptly instead of lingering past the configured timeout ## Validation - `go fmt ./...` - `go vet ./...` - `go test ./...` ## Notes from local investigation On the investigated machine there are more jobs in `reviews.db` than files in `~/.roborev/logs/jobs`, but the mismatch is historical rather than current: jobs from the last 7 days all had logs. The older missing logs line up with the existing log-retention/cleanup behavior and prior best-effort logging behavior. This PR is aimed at preventing new gaps under transient resource pressure and making timeout enforcement behave consistently.
1 parent e294ff9 commit 8c707e0

16 files changed

Lines changed: 747 additions & 78 deletions

internal/agent/claude.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"slices"
1313
"strings"
1414
"sync"
15-
"time"
1615
)
1716

1817
// ClaudeAgent runs code reviews using Claude Code CLI
@@ -135,7 +134,7 @@ func (a *ClaudeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
135134

136135
cmd := exec.CommandContext(ctx, a.Command, args...)
137136
cmd.Dir = repoPath
138-
cmd.WaitDelay = 5 * time.Second
137+
tracker := configureSubprocess(cmd)
139138

140139
// Strip CLAUDECODE to prevent nested-session detection (#270),
141140
// and handle API key (configured key or subscription auth).
@@ -153,6 +152,8 @@ func (a *ClaudeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
153152
if err != nil {
154153
return "", fmt.Errorf("create stdout pipe: %w", err)
155154
}
155+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
156+
defer stopClosingPipe()
156157
cmd.Stderr = &stderr
157158

158159
// Always pipe prompt via stdin (stream-json mode)
@@ -166,6 +167,9 @@ func (a *ClaudeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
166167
result, err := parseStreamJSON(stdoutPipe, output)
167168

168169
if waitErr := cmd.Wait(); waitErr != nil {
170+
if ctxErr := contextProcessError(ctx, tracker, waitErr, err); ctxErr != nil {
171+
return "", ctxErr
172+
}
169173
// Build a detailed error including any partial output and stream errors
170174
var detail strings.Builder
171175
fmt.Fprintf(&detail, "%s failed", a.Name())
@@ -186,6 +190,10 @@ func (a *ClaudeAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
186190
return "", fmt.Errorf("%s: %w", detail.String(), waitErr)
187191
}
188192

193+
if ctxErr := contextProcessError(ctx, tracker, nil, err); ctxErr != nil {
194+
return "", ctxErr
195+
}
196+
189197
if err != nil {
190198
return "", err
191199
}

internal/agent/codex.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os/exec"
1212
"strings"
1313
"sync"
14-
"time"
1514
)
1615

1716
// CodexAgent runs code reviews using the Codex CLI
@@ -196,7 +195,7 @@ func (a *CodexAgent) Review(ctx context.Context, repoPath, commitSHA, prompt str
196195

197196
cmd := exec.CommandContext(ctx, a.Command, args...)
198197
cmd.Dir = repoPath
199-
cmd.WaitDelay = 5 * time.Second
198+
tracker := configureSubprocess(cmd)
200199

201200
// Pipe prompt via stdin to avoid command line length limits on Windows.
202201
// Windows has a ~32KB limit on command line arguments, which large diffs easily exceed.
@@ -210,6 +209,8 @@ func (a *CodexAgent) Review(ctx context.Context, repoPath, commitSHA, prompt str
210209
if err != nil {
211210
return "", fmt.Errorf("create stdout pipe: %w", err)
212211
}
212+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
213+
defer stopClosingPipe()
213214
// Tee stderr to output writer for live error visibility
214215
if sw != nil {
215216
cmd.Stderr = io.MultiWriter(&stderr, sw)
@@ -225,12 +226,19 @@ func (a *CodexAgent) Review(ctx context.Context, repoPath, commitSHA, prompt str
225226
result, parseErr := a.parseStreamJSON(stdoutPipe, sw)
226227

227228
if waitErr := cmd.Wait(); waitErr != nil {
229+
if ctxErr := contextProcessError(ctx, tracker, waitErr, parseErr); ctxErr != nil {
230+
return "", ctxErr
231+
}
228232
if parseErr != nil {
229233
return "", fmt.Errorf("codex failed: %w (parse error: %v)\nstderr: %s", waitErr, parseErr, stderr.String())
230234
}
231235
return "", fmt.Errorf("codex failed: %w\nstderr: %s", waitErr, stderr.String())
232236
}
233237

238+
if ctxErr := contextProcessError(ctx, tracker, nil, parseErr); ctxErr != nil {
239+
return "", ctxErr
240+
}
241+
234242
if parseErr != nil {
235243
if errors.Is(parseErr, errNoCodexJSON) {
236244
return "", fmt.Errorf("codex CLI did not emit valid --json events; upgrade codex or check CLI compatibility: %w", errNoCodexJSON)

internal/agent/codex_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"slices"
88
"strings"
99
"testing"
10+
"time"
1011
)
1112

1213
func setupMockCodex(t *testing.T, unsafe bool, opts MockCLIOpts) (*CodexAgent, *MockCLIResult) {
@@ -116,6 +117,39 @@ func TestCodexReviewAlwaysAddsAutoApprove(t *testing.T) {
116117
}
117118
}
118119

120+
func TestCodexReviewTimeoutClosesStdoutPipe(t *testing.T) {
121+
skipIfWindows(t)
122+
123+
prevWaitDelay := subprocessWaitDelay
124+
subprocessWaitDelay = 20 * time.Millisecond
125+
t.Cleanup(func() {
126+
subprocessWaitDelay = prevWaitDelay
127+
})
128+
129+
cmdPath := writeTempCommand(t, `#!/bin/sh
130+
if [ "$1" = "--help" ]; then
131+
echo "usage `+codexAutoApproveFlag+`"
132+
exit 0
133+
fi
134+
(sleep 2) &
135+
printf '%s\n' '{"type":"item.completed","item":{"type":"agent_message","text":"partial"}}'
136+
exit 0
137+
`)
138+
139+
a := NewCodexAgent(cmdPath)
140+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
141+
defer cancel()
142+
143+
start := time.Now()
144+
_, err := a.Review(ctx, t.TempDir(), "deadbeef", "prompt", nil)
145+
if !errors.Is(err, context.DeadlineExceeded) {
146+
t.Fatalf("expected context deadline exceeded, got %v", err)
147+
}
148+
if elapsed := time.Since(start); elapsed > time.Second {
149+
t.Fatalf("Review hung for %v after timeout", elapsed)
150+
}
151+
}
152+
119153
func TestCodexParseStreamJSON(t *testing.T) {
120154
a := NewCodexAgent("codex")
121155

internal/agent/copilot.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ func (a *CopilotAgent) Review(ctx context.Context, repoPath, commitSHA, prompt s
8585
cmd := exec.CommandContext(ctx, a.Command, args...)
8686
cmd.Stdin = strings.NewReader(prompt)
8787
cmd.Dir = repoPath
88+
tracker := configureSubprocess(cmd)
8889

8990
var stdout, stderr bytes.Buffer
9091
if sw := newSyncWriter(output); sw != nil {
@@ -96,6 +97,9 @@ func (a *CopilotAgent) Review(ctx context.Context, repoPath, commitSHA, prompt s
9697
}
9798

9899
if err := cmd.Run(); err != nil {
100+
if ctxErr := contextProcessError(ctx, tracker, err, nil); ctxErr != nil {
101+
return "", ctxErr
102+
}
99103
return "", fmt.Errorf("copilot failed: %w\nstderr: %s", err, stderr.String())
100104
}
101105

internal/agent/cursor.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os"
99
"os/exec"
1010
"strings"
11-
"time"
1211
)
1312

1413
// CursorAgent runs code reviews using the Cursor agent CLI
@@ -112,14 +111,16 @@ func (a *CursorAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
112111
cmd := exec.CommandContext(ctx, a.Command, args...)
113112
cmd.Dir = repoPath
114113
cmd.Env = os.Environ()
115-
cmd.WaitDelay = 5 * time.Second
114+
tracker := configureSubprocess(cmd)
116115
cmd.Stdin = strings.NewReader(prompt)
117116

118117
var stderr bytes.Buffer
119118
stdoutPipe, err := cmd.StdoutPipe()
120119
if err != nil {
121120
return "", fmt.Errorf("create stdout pipe: %w", err)
122121
}
122+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
123+
defer stopClosingPipe()
123124
cmd.Stderr = &stderr
124125

125126
if err := cmd.Start(); err != nil {
@@ -130,12 +131,19 @@ func (a *CursorAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
130131
result, err := a.parseStreamJSON(stdoutPipe, output)
131132

132133
if waitErr := cmd.Wait(); waitErr != nil {
134+
if ctxErr := contextProcessError(ctx, tracker, waitErr, err); ctxErr != nil {
135+
return "", ctxErr
136+
}
133137
if err != nil {
134138
return "", fmt.Errorf("cursor agent failed: %w (parse error: %v)\nstderr: %s", waitErr, err, stderr.String())
135139
}
136140
return "", fmt.Errorf("cursor agent failed: %w\nstderr: %s", waitErr, stderr.String())
137141
}
138142

143+
if ctxErr := contextProcessError(ctx, tracker, nil, err); ctxErr != nil {
144+
return "", ctxErr
145+
}
146+
139147
if err != nil {
140148
return "", err
141149
}

internal/agent/droid.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (a *DroidAgent) Review(ctx context.Context, repoPath, commitSHA, prompt str
104104
cmd := exec.CommandContext(ctx, a.Command, args...)
105105
cmd.Dir = repoPath
106106
cmd.Stdin = strings.NewReader(prompt)
107+
tracker := configureSubprocess(cmd)
107108

108109
var stdout, stderr bytes.Buffer
109110
cmd.Stdout = &stdout
@@ -114,6 +115,9 @@ func (a *DroidAgent) Review(ctx context.Context, repoPath, commitSHA, prompt str
114115
}
115116

116117
if err := cmd.Run(); err != nil {
118+
if ctxErr := contextProcessError(ctx, tracker, err, nil); ctxErr != nil {
119+
return "", ctxErr
120+
}
117121
return "", fmt.Errorf("droid failed: %w\nstderr: %s", err, stderr.String())
118122
}
119123

internal/agent/gemini.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"io"
1111
"os/exec"
1212
"strings"
13-
"time"
1413
)
1514

1615
// errNoStreamJSON indicates no valid stream-json events were parsed.
@@ -117,7 +116,7 @@ func (a *GeminiAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
117116

118117
cmd := exec.CommandContext(ctx, a.Command, args...)
119118
cmd.Dir = repoPath
120-
cmd.WaitDelay = 5 * time.Second
119+
tracker := configureSubprocess(cmd)
121120

122121
// Pipe prompt via stdin
123122
cmd.Stdin = strings.NewReader(prompt)
@@ -130,6 +129,8 @@ func (a *GeminiAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
130129
if err != nil {
131130
return "", fmt.Errorf("create stdout pipe: %w", err)
132131
}
132+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
133+
defer stopClosingPipe()
133134
// Tee stderr to output writer for live error visibility
134135
if sw != nil {
135136
cmd.Stderr = io.MultiWriter(&stderr, sw)
@@ -145,12 +146,19 @@ func (a *GeminiAgent) Review(ctx context.Context, repoPath, commitSHA, prompt st
145146
parsed, parseErr := a.parseStreamJSON(stdoutPipe, sw)
146147

147148
if waitErr := cmd.Wait(); waitErr != nil {
149+
if ctxErr := contextProcessError(ctx, tracker, waitErr, parseErr); ctxErr != nil {
150+
return "", ctxErr
151+
}
148152
if parseErr != nil {
149153
return "", fmt.Errorf("gemini failed: %w (parse error: %v)\nstderr: %s", waitErr, parseErr, truncateStderr(stderr.String()))
150154
}
151155
return "", fmt.Errorf("gemini failed: %w\nstderr: %s", waitErr, truncateStderr(stderr.String()))
152156
}
153157

158+
if ctxErr := contextProcessError(ctx, tracker, nil, parseErr); ctxErr != nil {
159+
return "", ctxErr
160+
}
161+
154162
if parseErr != nil {
155163
if errors.Is(parseErr, errNoStreamJSON) {
156164
return "", fmt.Errorf("gemini CLI must support --output-format stream-json; upgrade to latest version\nstderr: %s: %w", truncateStderr(stderr.String()), errNoStreamJSON)

internal/agent/kilo.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func (a *KiloAgent) Review(
106106
cmd := exec.CommandContext(ctx, a.Command, a.buildArgs()...)
107107
cmd.Dir = repoPath
108108
cmd.Stdin = strings.NewReader(prompt)
109+
tracker := configureSubprocess(cmd)
109110

110111
sw := newSyncWriter(output)
111112

@@ -120,6 +121,8 @@ func (a *KiloAgent) Review(
120121
if err != nil {
121122
return "", fmt.Errorf("create stdout pipe: %w", err)
122123
}
124+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
125+
defer stopClosingPipe()
123126

124127
if err := cmd.Start(); err != nil {
125128
return "", fmt.Errorf("start kilo: %w", err)
@@ -140,6 +143,9 @@ func (a *KiloAgent) Review(
140143
_, _ = io.Copy(&stdoutRaw, stdoutPipe)
141144

142145
if waitErr := cmd.Wait(); waitErr != nil {
146+
if ctxErr := contextProcessError(ctx, tracker, waitErr, parseErr); ctxErr != nil {
147+
return "", ctxErr
148+
}
143149
var detail strings.Builder
144150
fmt.Fprintf(&detail, "kilo failed")
145151
if parseErr != nil {
@@ -165,6 +171,10 @@ func (a *KiloAgent) Review(
165171
return "", fmt.Errorf("%s: %w", detail.String(), waitErr)
166172
}
167173

174+
if ctxErr := contextProcessError(ctx, tracker, nil, parseErr); ctxErr != nil {
175+
return "", ctxErr
176+
}
177+
168178
if parseErr != nil {
169179
return result, parseErr
170180
}

internal/agent/kiro.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"os"
99
"os/exec"
1010
"strings"
11-
"time"
1211
)
1312

1413
// maxPromptArgLen is a conservative limit for passing prompts as
@@ -150,7 +149,7 @@ func (a *KiroAgent) Review(ctx context.Context, repoPath, commitSHA, prompt stri
150149
cmd := exec.CommandContext(ctx, a.Command, args...)
151150
cmd.Dir = repoPath
152151
cmd.Env = os.Environ()
153-
cmd.WaitDelay = 5 * time.Second
152+
tracker := configureSubprocess(cmd)
154153

155154
// kiro-cli emits ANSI terminal escape codes that are not
156155
// suitable for streaming. Capture and return stripped text.
@@ -159,6 +158,9 @@ func (a *KiroAgent) Review(ctx context.Context, repoPath, commitSHA, prompt stri
159158
cmd.Stderr = &stderr
160159

161160
if err := cmd.Run(); err != nil {
161+
if ctxErr := contextProcessError(ctx, tracker, err, nil); ctxErr != nil {
162+
return "", ctxErr
163+
}
162164
return "", fmt.Errorf(
163165
"kiro failed: %w\nstderr: %s",
164166
err, stderr.String(),

internal/agent/opencode.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func (a *OpenCodeAgent) Review(
9393
cmd := exec.CommandContext(ctx, a.Command, args...)
9494
cmd.Dir = repoPath
9595
cmd.Stdin = strings.NewReader(prompt)
96+
tracker := configureSubprocess(cmd)
9697

9798
// Share a single syncWriter so stdout and stderr writes
9899
// to the output writer are serialized by one mutex.
@@ -109,6 +110,8 @@ func (a *OpenCodeAgent) Review(
109110
if err != nil {
110111
return "", fmt.Errorf("create stdout pipe: %w", err)
111112
}
113+
stopClosingPipe := closeOnContextDone(ctx, stdoutPipe)
114+
defer stopClosingPipe()
112115

113116
if err := cmd.Start(); err != nil {
114117
return "", fmt.Errorf("start opencode: %w", err)
@@ -124,6 +127,9 @@ func (a *OpenCodeAgent) Review(
124127
_, _ = io.Copy(io.Discard, stdoutPipe)
125128

126129
if waitErr := cmd.Wait(); waitErr != nil {
130+
if ctxErr := contextProcessError(ctx, tracker, waitErr, parseErr); ctxErr != nil {
131+
return "", ctxErr
132+
}
127133
var detail strings.Builder
128134
fmt.Fprintf(&detail, "opencode failed")
129135
if parseErr != nil {
@@ -142,6 +148,10 @@ func (a *OpenCodeAgent) Review(
142148
return "", fmt.Errorf("%s: %w", detail.String(), waitErr)
143149
}
144150

151+
if ctxErr := contextProcessError(ctx, tracker, nil, parseErr); ctxErr != nil {
152+
return "", ctxErr
153+
}
154+
145155
if parseErr != nil {
146156
return result, parseErr
147157
}

0 commit comments

Comments
 (0)