diff --git a/apps/kubernetes-provider/src/taskMonitor.ts b/apps/kubernetes-provider/src/taskMonitor.ts index 9d54b401ee..aadcef18d8 100644 --- a/apps/kubernetes-provider/src/taskMonitor.ts +++ b/apps/kubernetes-provider/src/taskMonitor.ts @@ -160,7 +160,10 @@ export class TaskMonitor { let reason = rawReason || "Unknown error"; let logs = rawLogs || ""; - let overrideCompletion = false; + + /** This will only override existing task errors. It will not crash the run. */ + let onlyOverrideExistingError = exitCode === EXIT_CODE_CHILD_NONZERO; + let errorCode: TaskRunInternalError["code"] = TaskRunErrorCodes.POD_UNKNOWN_ERROR; switch (rawReason) { @@ -185,10 +188,8 @@ export class TaskMonitor { } break; case "OOMKilled": - overrideCompletion = true; - reason = `${ - exitCode === EXIT_CODE_CHILD_NONZERO ? "Child process" : "Parent process" - } ran out of memory! Try choosing a machine preset with more memory for this task.`; + reason = + "[TaskMonitor] Your task ran out of memory. Try increasing the machine specs. If this doesn't fix it there might be a memory leak."; errorCode = TaskRunErrorCodes.TASK_PROCESS_OOM_KILLED; break; default: @@ -199,7 +200,7 @@ export class TaskMonitor { exitCode, reason, logs, - overrideCompletion, + overrideCompletion: onlyOverrideExistingError, errorCode, } satisfies FailureDetails; diff --git a/apps/webapp/app/components/runs/v3/SpanEvents.tsx b/apps/webapp/app/components/runs/v3/SpanEvents.tsx index c6135f3f9e..868ffde50b 100644 --- a/apps/webapp/app/components/runs/v3/SpanEvents.tsx +++ b/apps/webapp/app/components/runs/v3/SpanEvents.tsx @@ -1,3 +1,4 @@ +import { EnvelopeIcon } from "@heroicons/react/20/solid"; import { exceptionEventEnhancer, isExceptionSpanEvent, @@ -5,6 +6,8 @@ import { type SpanEvent as OtelSpanEvent, } from "@trigger.dev/core/v3"; import { CodeBlock } from "~/components/code/CodeBlock"; +import { Feedback } from "~/components/Feedback"; +import { Button } from "~/components/primitives/Buttons"; import { Callout } from "~/components/primitives/Callout"; import { DateTimeAccurate } from "~/components/primitives/DateTime"; import { Header2, Header3 } from "~/components/primitives/Headers"; @@ -75,11 +78,26 @@ export function SpanEventError({ titleClassName="text-rose-500" /> {enhancedException.message && {enhancedException.message}} - {enhancedException.link && ( - - {enhancedException.link.name} - - )} + {enhancedException.link && + (enhancedException.link.magic === "CONTACT_FORM" ? ( + + {enhancedException.link.name} + + } + /> + ) : ( + + {enhancedException.link.name} + + ))} {enhancedException.stacktrace && ( { - const completeAttempt = new CompleteAttemptService(); + const completeAttempt = new CompleteAttemptService({ + supportsRetryCheckpoints: message.version === "v1", + }); await completeAttempt.call({ completion: message.completion, execution: message.execution, checkpoint: message.checkpoint, - supportsRetryCheckpoints: message.version === "v1", }); }, TASK_RUN_FAILED_TO_RUN: async (message) => { @@ -301,11 +303,13 @@ function createProviderNamespace(io: Server) { handlers: { WORKER_CRASHED: async (message) => { try { - const service = new CrashTaskRunService(); - - await service.call(message.runId, { - ...message, - }); + if (message.overrideCompletion) { + const updateErrorService = new UpdateFatalRunErrorService(); + await updateErrorService.call(message.runId, { ...message }); + } else { + const crashRunService = new CrashTaskRunService(); + await crashRunService.call(message.runId, { ...message }); + } } catch (error) { logger.error("Error while handling crashed worker", { error }); } diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/requeueTaskRun.server.ts index 005eadbf6a..5965fef321 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/requeueTaskRun.server.ts @@ -7,6 +7,7 @@ import { BaseService } from "./services/baseService.server"; import { PrismaClientOrTransaction } from "~/db.server"; import { workerQueue } from "~/services/worker.server"; import { socketIo } from "./handleSocketIo.server"; +import { TaskRunErrorCodes } from "@trigger.dev/core/v3"; export class RequeueTaskRunService extends BaseService { public async call(runId: string) { @@ -59,7 +60,7 @@ export class RequeueTaskRunService extends BaseService { retry: undefined, error: { type: "INTERNAL_ERROR", - code: "TASK_RUN_HEARTBEAT_TIMEOUT", + code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT, message: "Did not receive a heartbeat from the worker in time", }, }); diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 9695ee9c7f..74d700a680 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -1,6 +1,7 @@ import { Attributes } from "@opentelemetry/api"; import { TaskRunContext, + TaskRunErrorCodes, TaskRunExecution, TaskRunExecutionResult, TaskRunExecutionRetry, @@ -9,6 +10,7 @@ import { flattenAttributes, sanitizeError, shouldRetryError, + taskRunErrorEnhancer, } from "@trigger.dev/core/v3"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; @@ -34,23 +36,28 @@ type CheckpointData = { location: string; }; +type CompleteAttemptServiceOptions = { + prisma?: PrismaClientOrTransaction; + supportsRetryCheckpoints?: boolean; + isSystemFailure?: boolean; + isCrash?: boolean; +}; + export class CompleteAttemptService extends BaseService { + constructor(private opts: CompleteAttemptServiceOptions = {}) { + super(opts.prisma); + } + public async call({ completion, execution, env, checkpoint, - supportsRetryCheckpoints, - isSystemFailure, - isCrash, }: { completion: TaskRunExecutionResult; execution: TaskRunExecution; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; - supportsRetryCheckpoints?: boolean; - isSystemFailure?: boolean; - isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { const taskRunAttempt = await findAttempt(this._prisma, execution.attempt.id); @@ -84,7 +91,7 @@ export class CompleteAttemptService extends BaseService { attemptStatus: "FAILED", error: { type: "INTERNAL_ERROR", - code: "TASK_EXECUTION_FAILED", + code: TaskRunErrorCodes.TASK_EXECUTION_FAILED, message: "Tried to complete attempt but it doesn't exist", }, }); @@ -115,9 +122,6 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt, env, checkpoint, - supportsRetryCheckpoints, - isSystemFailure, - isCrash, }); } } @@ -177,18 +181,12 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt, env, checkpoint, - supportsRetryCheckpoints, - isSystemFailure, - isCrash, }: { completion: TaskRunFailedExecutionResult; execution: TaskRunExecution; taskRunAttempt: NonNullable; env?: AuthenticatedEnvironment; checkpoint?: CheckpointData; - supportsRetryCheckpoints?: boolean; - isSystemFailure?: boolean; - isCrash?: boolean; }): Promise<"COMPLETED" | "RETRIED"> { if ( completion.error.type === "INTERNAL_ERROR" && @@ -224,29 +222,38 @@ export class CompleteAttemptService extends BaseService { const environment = env ?? (await this.#getEnvironment(execution.environment.id)); - const executionRetry = - completion.retry ?? - (await FailedTaskRunRetryHelper.getExecutionRetry({ + // This means that tasks won't know they are being retried + let executionRetryInferred = false; + let executionRetry = completion.retry; + + const shouldInfer = this.opts.isCrash || this.opts.isSystemFailure; + + if (!executionRetry && shouldInfer) { + executionRetryInferred = true; + executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({ run: { ...taskRunAttempt.taskRun, lockedBy: taskRunAttempt.backgroundWorkerTask, lockedToVersion: taskRunAttempt.backgroundWorker, }, execution, - })); + }); + } + + const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); if ( - shouldRetryError(completion.error) && + retriableError && executionRetry !== undefined && taskRunAttempt.number < MAX_TASK_RUN_ATTEMPTS ) { return await this.#retryAttempt({ execution, executionRetry, + executionRetryInferred, taskRunAttempt, environment, checkpoint, - supportsRetryCheckpoints, }); } @@ -281,16 +288,15 @@ export class CompleteAttemptService extends BaseService { let status: FAILED_RUN_STATUSES; // Set the correct task run status - if (isSystemFailure) { + if (this.opts.isSystemFailure) { status = "SYSTEM_FAILURE"; - } else if (isCrash) { + } else if (this.opts.isCrash) { status = "CRASHED"; } else if ( sanitizedError.type === "INTERNAL_ERROR" && sanitizedError.code === "MAX_DURATION_EXCEEDED" ) { status = "TIMED_OUT"; - // TODO: check we want these all to be crashes by default } else if (sanitizedError.type === "INTERNAL_ERROR") { status = "CRASHED"; } else { @@ -364,55 +370,82 @@ export class CompleteAttemptService extends BaseService { async #enqueueReattempt({ run, executionRetry, + executionRetryInferred, checkpointEventId, supportsLazyAttempts, - supportsRetryCheckpoints, }: { run: TaskRun; executionRetry: TaskRunExecutionRetry; + executionRetryInferred: boolean; checkpointEventId?: string; supportsLazyAttempts: boolean; - supportsRetryCheckpoints?: boolean; }) { const retryViaQueue = () => { + logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id }); + // We have to replace a potential RESUME with EXECUTE to correctly retry the attempt return marqs?.replaceMessage( run.id, { type: "EXECUTE", taskIdentifier: run.taskIdentifier, - checkpointEventId: supportsRetryCheckpoints ? checkpointEventId : undefined, - retryCheckpointsDisabled: !supportsRetryCheckpoints, + checkpointEventId: this.opts.supportsRetryCheckpoints ? checkpointEventId : undefined, + retryCheckpointsDisabled: !this.opts.supportsRetryCheckpoints, }, executionRetry.timestamp ); }; const retryDirectly = () => { + logger.debug("[CompleteAttemptService] Retrying attempt directly", { runId: run.id }); return RetryAttemptService.enqueue(run.id, this._prisma, new Date(executionRetry.timestamp)); }; // There's a checkpoint, so we need to go through the queue if (checkpointEventId) { - if (!supportsRetryCheckpoints) { - logger.error("Worker does not support retry checkpoints, but a checkpoint was created", { - runId: run.id, - checkpointEventId, - }); + if (!this.opts.supportsRetryCheckpoints) { + logger.error( + "[CompleteAttemptService] Worker does not support retry checkpoints, but a checkpoint was created", + { + runId: run.id, + checkpointEventId, + } + ); } + logger.debug("[CompleteAttemptService] Enqueuing retry attempt with checkpoint", { + runId: run.id, + }); await retryViaQueue(); return; } // Workers without lazy attempt support always need to go through the queue, which is where the attempt is created if (!supportsLazyAttempts) { + logger.debug("[CompleteAttemptService] Worker does not support lazy attempts", { + runId: run.id, + }); await retryViaQueue(); return; } // Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold - if (!supportsRetryCheckpoints && executionRetry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS) { + if ( + !this.opts.supportsRetryCheckpoints && + executionRetry.delay >= env.CHECKPOINT_THRESHOLD_IN_MS + ) { + logger.debug( + "[CompleteAttemptService] Worker does not support retry checkpoints and the delay exceeds the threshold", + { runId: run.id } + ); + await retryViaQueue(); + return; + } + + if (executionRetryInferred) { + logger.debug("[CompleteAttemptService] Execution retry inferred, forcing retry via queue", { + runId: run.id, + }); await retryViaQueue(); return; } @@ -424,17 +457,17 @@ export class CompleteAttemptService extends BaseService { async #retryAttempt({ execution, executionRetry, + executionRetryInferred, taskRunAttempt, environment, checkpoint, - supportsRetryCheckpoints, }: { execution: TaskRunExecution; executionRetry: TaskRunExecutionRetry; + executionRetryInferred: boolean; taskRunAttempt: NonNullable; environment: AuthenticatedEnvironment; checkpoint?: CheckpointData; - supportsRetryCheckpoints?: boolean; }) { const retryAt = new Date(executionRetry.timestamp); @@ -459,7 +492,7 @@ export class CompleteAttemptService extends BaseService { endTime: retryAt, }); - logger.debug("Retrying", { + logger.debug("[CompleteAttemptService] Retrying", { taskRun: taskRunAttempt.taskRun.friendlyId, retry: executionRetry, }); @@ -485,8 +518,8 @@ export class CompleteAttemptService extends BaseService { execution, taskRunAttempt, executionRetry, + executionRetryInferred, checkpoint, - supportsRetryCheckpoints, }); } @@ -494,7 +527,7 @@ export class CompleteAttemptService extends BaseService { run: taskRunAttempt.taskRun, executionRetry, supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, + executionRetryInferred, }); return "RETRIED"; @@ -504,14 +537,14 @@ export class CompleteAttemptService extends BaseService { execution, taskRunAttempt, executionRetry, + executionRetryInferred, checkpoint, - supportsRetryCheckpoints, }: { execution: TaskRunExecution; taskRunAttempt: NonNullable; executionRetry: TaskRunExecutionRetry; + executionRetryInferred: boolean; checkpoint: CheckpointData; - supportsRetryCheckpoints?: boolean; }) { const createCheckpoint = new CreateCheckpointService(this._prisma); const checkpointCreateResult = await createCheckpoint.call({ @@ -525,7 +558,7 @@ export class CompleteAttemptService extends BaseService { }); if (!checkpointCreateResult.success) { - logger.error("Failed to create reattempt checkpoint", { + logger.error("[CompleteAttemptService] Failed to create reattempt checkpoint", { checkpoint, runId: execution.run.id, attemptId: execution.attempt.id, @@ -550,7 +583,7 @@ export class CompleteAttemptService extends BaseService { executionRetry, checkpointEventId: checkpointCreateResult.event.id, supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts, - supportsRetryCheckpoints, + executionRetryInferred, }); return "RETRIED" as const; diff --git a/apps/webapp/app/v3/services/crashTaskRun.server.ts b/apps/webapp/app/v3/services/crashTaskRun.server.ts index cac467c097..8f9aaa1901 100644 --- a/apps/webapp/app/v3/services/crashTaskRun.server.ts +++ b/apps/webapp/app/v3/services/crashTaskRun.server.ts @@ -4,7 +4,7 @@ import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus"; -import { sanitizeError, TaskRunInternalError } from "@trigger.dev/core/v3"; +import { sanitizeError, TaskRunErrorCodes, TaskRunInternalError } from "@trigger.dev/core/v3"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; @@ -29,6 +29,11 @@ export class CrashTaskRunService extends BaseService { logger.debug("CrashTaskRunService.call", { runId, opts }); + if (options?.overrideCompletion) { + logger.error("CrashTaskRunService.call: overrideCompletion is deprecated", { runId }); + return; + } + const taskRun = await this._prisma.taskRun.findFirst({ where: { id: runId, @@ -59,7 +64,7 @@ export class CrashTaskRunService extends BaseService { id: runId, error: { type: "INTERNAL_ERROR", - code: opts.errorCode ?? "TASK_RUN_CRASHED", + code: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, message: opts.reason, stackTrace: opts.logs, }, @@ -108,7 +113,7 @@ export class CrashTaskRunService extends BaseService { attemptStatus: "FAILED", error: { type: "INTERNAL_ERROR", - code: opts.errorCode ?? "TASK_RUN_CRASHED", + code: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, message: opts.reason, stackTrace: opts.logs, }, @@ -131,7 +136,7 @@ export class CrashTaskRunService extends BaseService { event: event, crashedAt: opts.crashedAt, exception: { - type: opts.errorCode ?? "TASK_RUN_CRASHED", + type: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, message: opts.reason, stacktrace: opts.logs, }, @@ -186,7 +191,7 @@ export class CrashTaskRunService extends BaseService { completedAt: failedAt, error: sanitizeError({ type: "INTERNAL_ERROR", - code: error.code ?? "TASK_RUN_CRASHED", + code: error.code ?? TaskRunErrorCodes.TASK_RUN_CRASHED, message: error.reason, stackTrace: error.logs, }), diff --git a/apps/webapp/app/v3/services/updateFatalRunError.server.ts b/apps/webapp/app/v3/services/updateFatalRunError.server.ts new file mode 100644 index 0000000000..2363d241c0 --- /dev/null +++ b/apps/webapp/app/v3/services/updateFatalRunError.server.ts @@ -0,0 +1,57 @@ +import { BaseService } from "./baseService.server"; +import { logger } from "~/services/logger.server"; +import { isFatalRunStatus } from "../taskStatus"; +import { TaskRunErrorCodes, TaskRunInternalError } from "@trigger.dev/core/v3"; +import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; + +export type UpdateFatalRunErrorServiceOptions = { + reason?: string; + exitCode?: number; + logs?: string; + errorCode?: TaskRunInternalError["code"]; +}; + +export class UpdateFatalRunErrorService extends BaseService { + public async call(runId: string, options?: UpdateFatalRunErrorServiceOptions) { + const opts = { + reason: "Worker crashed", + ...options, + }; + + logger.debug("UpdateFatalRunErrorService.call", { runId, opts }); + + const taskRun = await this._prisma.taskRun.findFirst({ + where: { + id: runId, + }, + }); + + if (!taskRun) { + logger.error("[UpdateFatalRunErrorService] Task run not found", { runId }); + return; + } + + if (!isFatalRunStatus(taskRun.status)) { + logger.warn("[UpdateFatalRunErrorService] Task run is not in a fatal state", { + runId, + status: taskRun.status, + }); + + return; + } + + logger.debug("[UpdateFatalRunErrorService] Updating crash error", { runId, options }); + + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRun.id, + status: "CRASHED", + error: { + type: "INTERNAL_ERROR", + code: opts.errorCode ?? TaskRunErrorCodes.TASK_RUN_CRASHED, + message: opts.reason, + stackTrace: opts.logs, + }, + }); + } +} diff --git a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts index 949d262b05..5788b05c06 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-controller.ts @@ -519,12 +519,12 @@ class ProdWorker { logger.log("completion acknowledged", { willCheckpointAndRestore, shouldExit }); - const exitCode = + const isNonZeroExitError = !completion.ok && completion.error.type === "INTERNAL_ERROR" && - completion.error.code === TaskRunErrorCodes.TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE - ? EXIT_CODE_CHILD_NONZERO - : 0; + completion.error.code === TaskRunErrorCodes.TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE; + + const exitCode = isNonZeroExitError ? EXIT_CODE_CHILD_NONZERO : 0; if (shouldExit) { // Exit after completion, without any retrying diff --git a/packages/core/src/v3/apps/process.ts b/packages/core/src/v3/apps/process.ts index 5396967a7f..1c90eb86ea 100644 --- a/packages/core/src/v3/apps/process.ts +++ b/packages/core/src/v3/apps/process.ts @@ -1,2 +1,4 @@ +/** This was used by the old build system in case of indexing failures */ export const EXIT_CODE_ALREADY_HANDLED = 111; -export const EXIT_CODE_CHILD_NONZERO = 112; \ No newline at end of file +/** This means what it says and is only set once we have completed the attempt */ +export const EXIT_CODE_CHILD_NONZERO = 112; diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index e0b8e22ac7..bd2238043f 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -161,12 +161,12 @@ export function shouldRetryError(error: TaskRunError): boolean { case "TASK_ALREADY_RUNNING": case "TASK_PROCESS_SIGKILL_TIMEOUT": case "TASK_PROCESS_SIGSEGV": - case "TASK_PROCESS_SIGTERM": case "TASK_PROCESS_OOM_KILLED": case "TASK_PROCESS_MAYBE_OOM_KILLED": case "TASK_RUN_CANCELLED": case "MAX_DURATION_EXCEEDED": case "DISK_SPACE_EXCEEDED": + case "TASK_RUN_HEARTBEAT_TIMEOUT": return false; case "GRACEFUL_EXIT_TIMEOUT": @@ -178,8 +178,8 @@ export function shouldRetryError(error: TaskRunError): boolean { case "TASK_EXECUTION_ABORTED": case "TASK_EXECUTION_FAILED": case "TASK_RUN_CRASHED": - case "TASK_RUN_HEARTBEAT_TIMEOUT": case "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE": + case "TASK_PROCESS_SIGTERM": return true; default: diff --git a/packages/core/src/v3/runtime/noopRuntimeManager.ts b/packages/core/src/v3/runtime/noopRuntimeManager.ts index 7415c96e5d..16e96de3e9 100644 --- a/packages/core/src/v3/runtime/noopRuntimeManager.ts +++ b/packages/core/src/v3/runtime/noopRuntimeManager.ts @@ -1,6 +1,7 @@ import { BatchTaskRunExecutionResult, TaskRunContext, + TaskRunErrorCodes, TaskRunExecutionResult, } from "../schemas/index.js"; import { RuntimeManager } from "./manager.js"; @@ -22,7 +23,10 @@ export class NoopRuntimeManager implements RuntimeManager { return Promise.resolve({ ok: false, id: params.id, - error: { type: "INTERNAL_ERROR", code: "CONFIGURED_INCORRECTLY" }, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.CONFIGURED_INCORRECTLY, + }, }); } diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 330f13e63c..31d6dc3e4f 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -252,6 +252,7 @@ export const ProviderToPlatformMessages = { exitCode: z.number().optional(), message: z.string().optional(), logs: z.string().optional(), + /** This means we should only update the error if one exists */ overrideCompletion: z.boolean().optional(), errorCode: TaskRunInternalError.shape.code.optional(), }),