Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/uncaught-exception-fail-attempt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fail attempts on uncaught exceptions instead of hanging to `MAX_DURATION_EXCEEDED`. A Node `EventEmitter` (e.g. `node-redis`) emitting `"error"` with no `.on("error", ...)` listener escalates to `uncaughtException`, which the worker previously reported but did not act on — runs drifted to maxDuration with empty attempts. They now fail fast with the original error and status `FAILED`, and respect the task's normal retry policy. You should still attach `.on("error", ...)` listeners to long-lived clients to handle errors gracefully.
12 changes: 12 additions & 0 deletions .server-changes/uncaught-exception-status-mapping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
area: run-engine
type: fix
---

Map the new `TASK_RUN_UNCAUGHT_EXCEPTION` internal-error code to
`COMPLETED_WITH_ERRORS` (Failed) status in `runStatusFromError`. cli-v3
now emits this code when the worker process surfaces an uncaught
exception (e.g. a Node EventEmitter emitting `"error"` with no listener),
so the run renders as a regular task failure in the dashboard rather
than a system failure, while still routing through the engine's
`lockedRetryConfig` lookup so the user's retry policy is honoured.
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export function runStatusFromError(
case "TASK_INPUT_ERROR":
case "TASK_OUTPUT_ERROR":
case "TASK_MIDDLEWARE_ERROR":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
return "COMPLETED_WITH_ERRORS";
case "TASK_RUN_CANCELLED":
return "CANCELED";
Expand Down
36 changes: 35 additions & 1 deletion packages/cli-v3/src/executions/taskRunProcess.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TaskRunProcess, type TaskRunProcessOptions } from "./taskRunProcess.js";
import { describe, it, expect, vi } from "vitest";
import { UnexpectedExitError } from "@trigger.dev/core/v3/errors";
import { UncaughtExceptionError, UnexpectedExitError } from "@trigger.dev/core/v3/errors";
import type {
TaskRunExecution,
TaskRunExecutionPayload,
Expand Down Expand Up @@ -118,4 +118,38 @@ describe("TaskRunProcess", () => {
}
});
});

describe("parseExecuteError(UncaughtExceptionError)", () => {
it("returns INTERNAL_ERROR with TASK_RUN_UNCAUGHT_EXCEPTION + original message and stack", () => {
const error = new UncaughtExceptionError(
{
name: "Error",
message: "read ECONNRESET",
stack:
"Error: read ECONNRESET\n at TCP.onStreamRead (node:internal/stream_base_commons:216:20)",
},
"uncaughtException"
);

const result = TaskRunProcess.parseExecuteError(error);

expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("read ECONNRESET");
expect(result.stackTrace).toContain("TCP.onStreamRead");
});

it("uses the same code for unhandledRejection origin", () => {
const error = new UncaughtExceptionError(
{ name: "TypeError", message: "boom" },
"unhandledRejection"
);

const result = TaskRunProcess.parseExecuteError(error);

expect(result.type).toBe("INTERNAL_ERROR");
expect(result.code).toBe("TASK_RUN_UNCAUGHT_EXCEPTION");
expect(result.message).toBe("boom");
});
});
});
45 changes: 45 additions & 0 deletions packages/cli-v3/src/executions/taskRunProcess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
MaxDurationExceededError,
UnexpectedExitError,
SuspendedProcessError,
UncaughtExceptionError,
} from "@trigger.dev/core/v3/errors";

export type OnSendDebugLogMessage = InferSocketMessageSchema<
Expand Down Expand Up @@ -205,6 +206,18 @@ export class TaskRunProcess {
},
UNCAUGHT_EXCEPTION: async (message) => {
logger.debug("uncaught exception in task run process", { ...message });

// The worker process reports uncaught exceptions and unhandled rejections via this
// event, but does not exit on its own. If we don't terminate the attempt here, run()
// hangs (the awaited promise that triggered the throw is orphaned) until maxDuration
// expires — surfacing as TIMED_OUT/MAX_DURATION_EXCEEDED with empty attempts. Reject
// any pending attempts now and gracefully terminate the worker so OTEL gets a flush
// window before SIGKILL.
this.#rejectPendingAttempts(
new UncaughtExceptionError(message.error, message.origin)
);

await this.#gracefullyTerminate(this.options.gracefulTerminationTimeoutInMs);
Comment thread
matt-aitken marked this conversation as resolved.
},
SEND_DEBUG_LOG: async (message) => {
this.onSendDebugLog.post(message);
Expand Down Expand Up @@ -339,6 +352,23 @@ export class TaskRunProcess {
logger.debug("child process error", { error, pid: this.pid });
}

#rejectPendingAttempts(error: Error) {
for (const [id, status] of this._attemptStatuses.entries()) {
if (status !== "PENDING") {
continue;
}

this._attemptStatuses.set(id, "REJECTED");

const attemptPromise = this._attemptPromises.get(id);
if (!attemptPromise) {
continue;
}

attemptPromise.rejecter(error);
}
}

async #handleExit(code: number | null, signal: NodeJS.Signals | null) {
logger.debug("handling child exit", { code, signal, pid: this.pid });

Expand Down Expand Up @@ -559,6 +589,21 @@ export class TaskRunProcess {
};
}

if (error instanceof UncaughtExceptionError) {
// Dedicated INTERNAL_ERROR code so the engine handles retry via the
// existing crash-style lookup of run.lockedRetryConfig (same pathway as
// TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE etc.) and so the dashboard
// renders this as "Failed" rather than "System failure" — the exception
// was raised by user code (or a dependency the user controls, e.g. an
// EventEmitter "error" event with no listener), not a platform fault.
return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_UNCAUGHT_EXCEPTION,
message: error.originalError.message,
stackTrace: error.originalError.stack,
};
}
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.

return {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_EXECUTION_FAILED,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ export function shouldRetryError(error: TaskRunError): boolean {
case "TASK_EXECUTION_ABORTED":
case "TASK_EXECUTION_FAILED":
case "TASK_RUN_CRASHED":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE":
case "TASK_PROCESS_SIGTERM":
return true;
Expand Down Expand Up @@ -425,6 +426,7 @@ export function shouldLookupRetrySettings(error: TaskRunError): boolean {
case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE":
case "TASK_PROCESS_SIGTERM":
case "TASK_PROCESS_SIGSEGV":
case "TASK_RUN_UNCAUGHT_EXCEPTION":
return true;

default:
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export const TaskRunInternalError = z.object({
"GRACEFUL_EXIT_TIMEOUT",
"TASK_RUN_HEARTBEAT_TIMEOUT",
"TASK_RUN_CRASHED",
"TASK_RUN_UNCAUGHT_EXCEPTION",
"MAX_DURATION_EXCEEDED",
"DISK_SPACE_EXCEEDED",
"POD_EVICTED",
Expand Down
Loading