Skip to content

Fixes for internal error reattempts #1436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions apps/kubernetes-provider/src/taskMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,10 @@ export class TaskMonitor {

let reason = rawReason || "Unknown error";
let logs = rawLogs || "";
let overrideCompletion = false;

/** This will only override existing task errors. It will not crash the run. */
let onlyOverrideExistingError = exitCode === EXIT_CODE_CHILD_NONZERO;

let errorCode: TaskRunInternalError["code"] = TaskRunErrorCodes.POD_UNKNOWN_ERROR;

switch (rawReason) {
Expand All @@ -185,10 +188,8 @@ export class TaskMonitor {
}
break;
case "OOMKilled":
overrideCompletion = true;
reason = `${
exitCode === EXIT_CODE_CHILD_NONZERO ? "Child process" : "Parent process"
} ran out of memory! Try choosing a machine preset with more memory for this task.`;
reason =
"[TaskMonitor] Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak.";
errorCode = TaskRunErrorCodes.TASK_PROCESS_OOM_KILLED;
break;
default:
Expand All @@ -199,7 +200,7 @@ export class TaskMonitor {
exitCode,
reason,
logs,
overrideCompletion,
overrideCompletion: onlyOverrideExistingError,
errorCode,
} satisfies FailureDetails;

Expand Down
28 changes: 23 additions & 5 deletions apps/webapp/app/components/runs/v3/SpanEvents.tsx
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { EnvelopeIcon } from "@heroicons/react/20/solid";
import {
exceptionEventEnhancer,
isExceptionSpanEvent,
type ExceptionEventProperties,
type SpanEvent as OtelSpanEvent,
} from "@trigger.dev/core/v3";
import { CodeBlock } from "~/components/code/CodeBlock";
import { Feedback } from "~/components/Feedback";
import { Button } from "~/components/primitives/Buttons";
import { Callout } from "~/components/primitives/Callout";
import { DateTimeAccurate } from "~/components/primitives/DateTime";
import { Header2, Header3 } from "~/components/primitives/Headers";
Expand Down Expand Up @@ -75,11 +78,26 @@ export function SpanEventError({
titleClassName="text-rose-500"
/>
{enhancedException.message && <Callout variant="error">{enhancedException.message}</Callout>}
{enhancedException.link && (
<Callout variant="docs" to={enhancedException.link.href}>
{enhancedException.link.name}
</Callout>
)}
{enhancedException.link &&
(enhancedException.link.magic === "CONTACT_FORM" ? (
<Feedback
button={
<Button
variant="tertiary/medium"
LeadingIcon={EnvelopeIcon}
leadingIconClassName="text-blue-400"
fullWidth
textAlignLeft
>
{enhancedException.link.name}
</Button>
}
/>
) : (
<Callout variant="docs" to={enhancedException.link.href}>
{enhancedException.link.name}
</Callout>
))}
{enhancedException.stacktrace && (
<CodeBlock
showCopyButton={false}
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/models/taskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type {
TaskRunFailedExecutionResult,
TaskRunSuccessfulExecutionResult,
} from "@trigger.dev/core/v3";
import { TaskRunError } from "@trigger.dev/core/v3";
import { TaskRunError, TaskRunErrorCodes } from "@trigger.dev/core/v3";

import type {
TaskRun,
Expand Down Expand Up @@ -62,7 +62,7 @@ export function executionResultForTaskRun(
id: taskRun.friendlyId,
error: {
type: "INTERNAL_ERROR",
code: "TASK_RUN_CANCELLED",
code: TaskRunErrorCodes.TASK_RUN_CANCELLED,
},
} satisfies TaskRunFailedExecutionResult;
}
Expand Down Expand Up @@ -94,7 +94,7 @@ export function executionResultForTaskRun(
id: taskRun.friendlyId,
error: {
type: "INTERNAL_ERROR",
code: "CONFIGURED_INCORRECTLY",
code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY,
},
} satisfies TaskRunFailedExecutionResult;
}
Expand Down
23 changes: 7 additions & 16 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,15 @@ export class FailedTaskRunRetryHelper extends BaseService {

logger.debug("[FailedTaskRunRetryHelper] Completing attempt", { taskRun, completion });

const executionRetry =
completion.retry ??
(await FailedTaskRunRetryHelper.getExecutionRetry({
run: taskRun,
execution: retriableExecution,
}));

const completeAttempt = new CompleteAttemptService(this._prisma);
const completeResult = await completeAttempt.call({
completion: {
...completion,
retry: executionRetry,
},
execution: retriableExecution,
const completeAttempt = new CompleteAttemptService({
prisma: this._prisma,
isSystemFailure: !isCrash,
isCrash,
});
const completeResult = await completeAttempt.call({
completion,
execution: retriableExecution,
});

return completeResult;
}
Expand Down Expand Up @@ -280,6 +272,5 @@ export class FailedTaskRunRetryHelper extends BaseService {
}
}

// TODO: update this to the correct version
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.0.14";
static DEFAULT_RETRY_CONFIG_SINCE_VERSION = "3.1.0";
}
18 changes: 11 additions & 7 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { Redis } from "ioredis";
import { createAdapter } from "@socket.io/redis-adapter";
import { CrashTaskRunService } from "./services/crashTaskRun.server";
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
import { UpdateFatalRunErrorService } from "./services/updateFatalRunError.server";

export const socketIo = singleton("socketIo", initalizeIoServer);

Expand Down Expand Up @@ -123,12 +124,13 @@ function createCoordinatorNamespace(io: Server) {
await resumeAttempt.call(message);
},
TASK_RUN_COMPLETED: async (message) => {
const completeAttempt = new CompleteAttemptService();
const completeAttempt = new CompleteAttemptService({
supportsRetryCheckpoints: message.version === "v1",
});
Comment on lines +127 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Future-proof the version check in CompleteAttemptService instantiation.

The current check message.version === "v1" only enables supportsRetryCheckpoints for version "v1". This might lead to issues with future versions where this feature should be supported. Consider revising the condition to include all versions that support retry checkpoints, or default to true if appropriate.

Apply this diff to improve the version check:

-            supportsRetryCheckpoints: message.version === "v1",
+            supportsRetryCheckpoints: message.version !== "v0",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const completeAttempt = new CompleteAttemptService({
supportsRetryCheckpoints: message.version === "v1",
});
const completeAttempt = new CompleteAttemptService({
supportsRetryCheckpoints: message.version !== "v0",
});

await completeAttempt.call({
completion: message.completion,
execution: message.execution,
checkpoint: message.checkpoint,
supportsRetryCheckpoints: message.version === "v1",
});
},
TASK_RUN_FAILED_TO_RUN: async (message) => {
Expand Down Expand Up @@ -301,11 +303,13 @@ function createProviderNamespace(io: Server) {
handlers: {
WORKER_CRASHED: async (message) => {
try {
const service = new CrashTaskRunService();

await service.call(message.runId, {
...message,
});
if (message.overrideCompletion) {
const updateErrorService = new UpdateFatalRunErrorService();
await updateErrorService.call(message.runId, { ...message });
} else {
const crashRunService = new CrashTaskRunService();
await crashRunService.call(message.runId, { ...message });
}
} catch (error) {
logger.error("Error while handling crashed worker", { error });
}
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/requeueTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { BaseService } from "./services/baseService.server";
import { PrismaClientOrTransaction } from "~/db.server";
import { workerQueue } from "~/services/worker.server";
import { socketIo } from "./handleSocketIo.server";
import { TaskRunErrorCodes } from "@trigger.dev/core/v3";

export class RequeueTaskRunService extends BaseService {
public async call(runId: string) {
Expand Down Expand Up @@ -59,7 +60,7 @@ export class RequeueTaskRunService extends BaseService {
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: "TASK_RUN_HEARTBEAT_TIMEOUT",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
Expand Down
Loading
Loading