Skip to content

Commit 212f853

Browse files
nicktrnsamejryadavshubham01tarunpsgithub-actions[bot]
authored
Automatically reattempt after internal errors (#1424)
* refactor finalize run service * refactor complete attempt service * remove separate graceful exit handling * refactor task status helpers * clearly separate statuses in prisma schema * all non-final statuses should be failable * new import payload error code * store default retry config if none set on task * failed run service now respects retries * fix merged task retry config indexing * some errors should never be retried * finalize run service takes care of acks now * execution payload helper now with single object arg * internal error code enum export * unify failed and crashed run retries * Prevent uncaught socket ack exceptions (#1415) * catch all the remaining socket acks that could possibly throw * wrap the remaining handlers in try catch * New onboarding question (#1404) * Updated “Twitter” to be “X (Twitter)” * added Textarea to storybook * Updated textarea styling to match input field * WIP adding new text field to org creation page * Added description to field * Submit feedback to Plain when an org signs up * Formatting improvement * type improvement * removed userId * Moved submitting to Plain into its own file * Change orgName with name * use sendToPlain function for the help & feedback email form * use name not orgName * import cleanup * Downgrading plan form uses sendToPlain * Get the userId from requireUser only * Added whitespace-pre-wrap to the message property on the run page * use requireUserId * Removed old Plain submit code * Added a new Context page for the docs (#1416) * Added a new context page with task context properties * Removed code comments * Added more crosslinks * Fix updating many environment variables at once (#1413) * Move code example to the side menu * New docs example for creating a HN email summary * doc: add instructions to create new reference project and run it locally (#1417) * doc: add instructions to create new reference project and run it locally * doc: Add instruction for running tunnel * minor language improvement * Fix several restore and resume bugs (#1418) * try to correct resume messages with missing checkpoint * prevent creating checkpoints for outdated task waits * prevent creating checkpoints for outdated batch waits * use heartbeats to check for and clean up any leftover containers * lint * improve exec logging * improve resume attempt logs * fix for resuming parents of canceled child runs * separate SIGTERM from maybe OOM errors * pretty errors can have magic dashboard links * prevent uncancellable checkpoints * simplify task run error code enum export * grab the last, not the first child run * Revert "prevent creating checkpoints for outdated batch waits" This reverts commit f2b5c2a. * Revert "grab the last, not the first child run" This reverts commit 89ec5c8. * Revert "prevent creating checkpoints for outdated task waits" This reverts commit 11066b4. * more logs for resume message handling * add magic error link comment * add changeset * chore: Update version for release (#1410) Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> * Release 3.0.13 * capture ffmpeg oom errors * respect maxAttempts=1 when failing before first attempt creation * request worker exit on fatal errors * fix error code merge * add new error code to should retry * pretty segfault errors * pretty internal errors for attempt spans * decrease oom false positives * fix timeline event color for failed runs * auto-retry packet import and export * add sdk version check and complete event while completing attempt * all internal errors become crashes by default * use pretty error helpers exclusively * error to debug log * zodfetch fixes * rename import payload to task input error * fix true non-zero exit error display * fix retry config parsing * correctly mark crashes as crashed * add changeset * remove non-zero exit comment * pretend we don't support default default retry configs yet --------- Co-authored-by: James Ritchie <[email protected]> Co-authored-by: shubham yadav <[email protected]> Co-authored-by: Tarun Pratap Singh <[email protected]> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 9a5e6e5 commit 212f853

File tree

23 files changed

+1111
-401
lines changed

23 files changed

+1111
-401
lines changed

.changeset/eight-turtles-itch.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
- Include retries.default in task retry config when indexing
7+
- New helpers for internal error retry mechanics
8+
- Detection for segfaults and ffmpeg OOM errors
9+
- Retries for packet import and export

apps/webapp/app/components/runs/v3/RunInspector.tsx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import {
4040
} from "~/utils/pathBuilder";
4141
import { TraceSpan } from "~/utils/taskEvent";
4242
import { SpanLink } from "~/v3/eventRepository.server";
43-
import { isFinalRunStatus } from "~/v3/taskStatus";
43+
import { isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
4444
import { RunTimelineEvent, RunTimelineLine } from "./InspectorTimeline";
4545
import { RunTag } from "./RunTag";
4646
import { TaskRunStatusCombo } from "./TaskRunStatus";
@@ -479,6 +479,7 @@ function RunTimeline({ run }: { run: RawRun }) {
479479
const updatedAt = new Date(run.updatedAt);
480480

481481
const isFinished = isFinalRunStatus(run.status);
482+
const isError = isFailedRunStatus(run.status);
482483

483484
return (
484485
<div className="min-w-fit max-w-80">
@@ -535,7 +536,7 @@ function RunTimeline({ run }: { run: RawRun }) {
535536
<RunTimelineEvent
536537
title="Finished"
537538
subtitle={<DateTimeAccurate date={updatedAt} />}
538-
state="complete"
539+
state={isError ? "error" : "complete"}
539540
/>
540541
</>
541542
) : (

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
88
import { eventRepository } from "~/v3/eventRepository.server";
99
import { machinePresetFromName } from "~/v3/machinePresets.server";
10-
import { FINAL_ATTEMPT_STATUSES, isFinalRunStatus } from "~/v3/taskStatus";
10+
import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
1111
import { BasePresenter } from "./basePresenter.server";
1212
import { getMaxDuration } from "~/v3/utils/maxDuration";
1313

@@ -294,6 +294,7 @@ export class SpanPresenter extends BasePresenter {
294294
usageDurationMs: run.usageDurationMs,
295295
isFinished,
296296
isRunning: RUNNING_STATUSES.includes(run.status),
297+
isError: isFailedRunStatus(run.status),
297298
payload,
298299
payloadType: run.payloadType,
299300
output,

apps/webapp/app/routes/api.v1.runs.$runParam.attempts.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ export async function action({ request, params }: ActionFunctionArgs) {
2929
const service = new CreateTaskRunAttemptService();
3030

3131
try {
32-
const { execution } = await service.call(runParam, authenticationResult.environment);
32+
const { execution } = await service.call({
33+
runId: runParam,
34+
authenticatedEnv: authenticationResult.environment,
35+
});
3336

3437
return json(execution, { status: 200 });
3538
} catch (error) {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ function RunTimeline({ run }: { run: SpanRun }) {
857857
<RunTimelineEvent
858858
title="Finished"
859859
subtitle={<DateTimeAccurate date={run.updatedAt} />}
860-
state="complete"
860+
state={run.isError ? "error" : "complete"}
861861
/>
862862
</>
863863
) : (
Lines changed: 248 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,41 @@
1-
import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
1+
import {
2+
calculateNextRetryDelay,
3+
RetryOptions,
4+
TaskRunExecution,
5+
TaskRunExecutionRetry,
6+
TaskRunFailedExecutionResult,
7+
} from "@trigger.dev/core/v3";
28
import { logger } from "~/services/logger.server";
3-
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
49
import { BaseService } from "./services/baseService.server";
5-
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
6-
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
10+
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
11+
import type { Prisma, TaskRun } from "@trigger.dev/database";
12+
import { CompleteAttemptService } from "./services/completeAttempt.server";
13+
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
14+
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
15+
import * as semver from "semver";
16+
17+
const includeAttempts = {
18+
attempts: {
19+
orderBy: {
20+
createdAt: "desc",
21+
},
22+
take: 1,
23+
},
24+
lockedBy: true, // task
25+
lockedToVersion: true, // worker
26+
} satisfies Prisma.TaskRunInclude;
27+
28+
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
29+
include: typeof includeAttempts;
30+
}>;
731

832
export class FailedTaskRunService extends BaseService {
933
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
34+
logger.debug("[FailedTaskRunService] Handling failed task run", { anyRunId, completion });
35+
1036
const isFriendlyId = anyRunId.startsWith("run_");
1137

12-
const taskRun = await this._prisma.taskRun.findUnique({
38+
const taskRun = await this._prisma.taskRun.findFirst({
1339
where: {
1440
friendlyId: isFriendlyId ? anyRunId : undefined,
1541
id: !isFriendlyId ? anyRunId : undefined,
@@ -25,7 +51,7 @@ export class FailedTaskRunService extends BaseService {
2551
return;
2652
}
2753

28-
if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
54+
if (!isFailableRunStatus(taskRun.status)) {
2955
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
3056
taskRun,
3157
completion,
@@ -34,33 +60,226 @@ export class FailedTaskRunService extends BaseService {
3460
return;
3561
}
3662

37-
// No more retries, we need to fail the task run
38-
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
63+
const retryHelper = new FailedTaskRunRetryHelper(this._prisma);
64+
const retryResult = await retryHelper.call({
65+
runId: taskRun.id,
66+
completion,
67+
});
68+
69+
logger.debug("[FailedTaskRunService] Completion result", {
70+
runId: taskRun.id,
71+
result: retryResult,
72+
});
73+
}
74+
}
75+
76+
interface TaskRunWithWorker extends TaskRun {
77+
lockedBy: { retryConfig: Prisma.JsonValue } | null;
78+
lockedToVersion: { sdkVersion: string } | null;
79+
}
3980

40-
const finalizeService = new FinalizeTaskRunService();
41-
await finalizeService.call({
42-
id: taskRun.id,
43-
status: "SYSTEM_FAILURE",
44-
completedAt: new Date(),
45-
attemptStatus: "FAILED",
46-
error: sanitizeError(completion.error),
81+
export class FailedTaskRunRetryHelper extends BaseService {
82+
async call({
83+
runId,
84+
completion,
85+
isCrash,
86+
}: {
87+
runId: string;
88+
completion: TaskRunFailedExecutionResult;
89+
isCrash?: boolean;
90+
}) {
91+
const taskRun = await this._prisma.taskRun.findFirst({
92+
where: {
93+
id: runId,
94+
},
95+
include: includeAttempts,
4796
});
4897

49-
// Now we need to "complete" the task run event/span
50-
await eventRepository.completeEvent(taskRun.spanId, {
51-
endTime: new Date(),
52-
attributes: {
53-
isError: true,
98+
if (!taskRun) {
99+
logger.error("[FailedTaskRunRetryHelper] Task run not found", {
100+
runId,
101+
completion,
102+
});
103+
104+
return "NO_TASK_RUN";
105+
}
106+
107+
const retriableExecution = await this.#getRetriableAttemptExecution(taskRun, completion);
108+
109+
if (!retriableExecution) {
110+
return "NO_EXECUTION";
111+
}
112+
113+
logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });
114+
115+
const executionRetry =
116+
completion.retry ??
117+
(await FailedTaskRunRetryHelper.getExecutionRetry({
118+
run: taskRun,
119+
execution: retriableExecution,
120+
}));
121+
122+
const completeAttempt = new CompleteAttemptService(this._prisma);
123+
const completeResult = await completeAttempt.call({
124+
completion: {
125+
...completion,
126+
retry: executionRetry,
54127
},
55-
events: [
56-
{
57-
name: "exception",
58-
time: new Date(),
59-
properties: {
60-
exception: createExceptionPropertiesFromError(completion.error),
61-
},
62-
},
63-
],
128+
execution: retriableExecution,
129+
isSystemFailure: !isCrash,
130+
isCrash,
64131
});
132+
133+
return completeResult;
134+
}
135+
136+
async #getRetriableAttemptExecution(
137+
run: TaskRunWithAttempts,
138+
completion: TaskRunFailedExecutionResult
139+
): Promise<TaskRunExecution | undefined> {
140+
let attempt = run.attempts[0];
141+
142+
// We need to create an attempt if:
143+
// - None exists yet
144+
// - The last attempt has a final status, e.g. we failed between attempts
145+
if (!attempt || isFinalAttemptStatus(attempt.status)) {
146+
logger.debug("[FailedTaskRunRetryHelper] No attempts found", {
147+
run,
148+
completion,
149+
});
150+
151+
const createAttempt = new CreateTaskRunAttemptService(this._prisma);
152+
153+
try {
154+
const { execution } = await createAttempt.call({
155+
runId: run.id,
156+
// This ensures we correctly respect `maxAttempts = 1` when failing before the first attempt was created
157+
startAtZero: true,
158+
});
159+
return execution;
160+
} catch (error) {
161+
logger.error("[FailedTaskRunRetryHelper] Failed to create attempt", {
162+
run,
163+
completion,
164+
error,
165+
});
166+
167+
return;
168+
}
169+
}
170+
171+
// We already have an attempt with non-final status, let's use it
172+
try {
173+
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
174+
id: attempt.id,
175+
skipStatusChecks: true,
176+
});
177+
178+
return executionPayload?.execution;
179+
} catch (error) {
180+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution payload", {
181+
run,
182+
completion,
183+
error,
184+
});
185+
186+
return;
187+
}
188+
}
189+
190+
static async getExecutionRetry({
191+
run,
192+
execution,
193+
}: {
194+
run: TaskRunWithWorker;
195+
execution: TaskRunExecution;
196+
}): Promise<TaskRunExecutionRetry | undefined> {
197+
try {
198+
const retryConfig = run.lockedBy?.retryConfig;
199+
200+
if (!retryConfig) {
201+
if (!run.lockedToVersion) {
202+
logger.error("[FailedTaskRunRetryHelper] Run not locked to version", {
203+
run,
204+
execution,
205+
});
206+
207+
return;
208+
}
209+
210+
const sdkVersion = run.lockedToVersion.sdkVersion ?? "0.0.0";
211+
const isValid = semver.valid(sdkVersion);
212+
213+
if (!isValid) {
214+
logger.error("[FailedTaskRunRetryHelper] Invalid SDK version", {
215+
run,
216+
execution,
217+
});
218+
219+
return;
220+
}
221+
222+
// With older SDK versions, tasks only have a retry config stored in the DB if it's explicitly defined on the task itself
223+
// It won't get populated with retry.default in trigger.config.ts
224+
if (semver.lt(sdkVersion, FailedTaskRunRetryHelper.DEFAULT_RETRY_CONFIG_SINCE_VERSION)) {
225+
logger.warn(
226+
"[FailedTaskRunRetryHelper] SDK version not recent enough to determine retry config",
227+
{
228+
run,
229+
execution,
230+
}
231+
);
232+
233+
return;
234+
}
235+
}
236+
237+
const parsedRetryConfig = RetryOptions.nullable().safeParse(retryConfig);
238+
239+
if (!parsedRetryConfig.success) {
240+
logger.error("[FailedTaskRunRetryHelper] Invalid retry config", {
241+
run,
242+
execution,
243+
});
244+
245+
return;
246+
}
247+
248+
if (!parsedRetryConfig.data) {
249+
logger.debug("[FailedTaskRunRetryHelper] No retry config", {
250+
run,
251+
execution,
252+
});
253+
254+
return;
255+
}
256+
257+
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
258+
259+
if (!delay) {
260+
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
261+
run,
262+
execution,
263+
});
264+
265+
return;
266+
}
267+
268+
return {
269+
timestamp: Date.now() + delay,
270+
delay,
271+
};
272+
} catch (error) {
273+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
274+
run,
275+
execution,
276+
error,
277+
});
278+
279+
return;
280+
}
65281
}
282+
283+
// TODO: update this to the correct version
284+
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.14";
66285
}

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,11 @@ function createCoordinatorNamespace(io: Server) {
193193
}
194194

195195
const service = new CreateTaskRunAttemptService();
196-
const { attempt } = await service.call(message.runId, environment, false);
196+
const { attempt } = await service.call({
197+
runId: message.runId,
198+
authenticatedEnv: environment,
199+
setToExecuting: false,
200+
});
197201

198202
const payload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
199203
id: attempt.id,

0 commit comments

Comments
 (0)