diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index c45ef65f59..e313510948 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -17,6 +17,7 @@ import { ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler"; import { BackgroundWorker, BackgroundWorkerTask, + Prisma, RuntimeEnvironment, TaskRun, TaskRunStatus, @@ -998,9 +999,159 @@ export class SharedQueueConsumer { } } +type AttemptForCompletion = Prisma.TaskRunAttemptGetPayload<{ + include: { + backgroundWorker: true; + backgroundWorkerTask: true; + taskRun: { + include: { + runtimeEnvironment: { + include: { + organization: true; + project: true; + }; + }; + tags: true; + }; + }; + queue: true; + }; +}>; + +type AttemptForExecution = Prisma.TaskRunAttemptGetPayload<{ + include: { + backgroundWorker: true; + backgroundWorkerTask: true; + runtimeEnvironment: { + include: { + organization: true; + project: true; + }; + }; + taskRun: { + include: { + tags: true; + batchItems: { + include: { + batchTaskRun: { + select: { + friendlyId: true; + }; + }; + }; + }; + }; + }; + queue: true; + }; +}>; + class SharedQueueTasks { + private _completionPayloadFromAttempt(attempt: AttemptForCompletion): TaskRunExecutionResult { + const ok = attempt.status === "COMPLETED"; + + if (ok) { + const success: TaskRunSuccessfulExecutionResult = { + ok, + id: attempt.taskRun.friendlyId, + output: attempt.output ?? undefined, + outputType: attempt.outputType, + taskIdentifier: attempt.taskRun.taskIdentifier, + }; + return success; + } else { + const failure: TaskRunFailedExecutionResult = { + ok, + id: attempt.taskRun.friendlyId, + error: attempt.error as TaskRunError, + taskIdentifier: attempt.taskRun.taskIdentifier, + }; + return failure; + } + } + + private async _executionFromAttempt( + attempt: AttemptForExecution, + machinePreset?: MachinePreset + ): Promise { + const { backgroundWorkerTask, taskRun, queue } = attempt; + + if (!machinePreset) { + machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {}); + } + + const metadata = await parsePacket({ + data: taskRun.metadata ?? undefined, + dataType: taskRun.metadataType, + }); + + const execution: ProdTaskRunExecution = { + task: { + id: backgroundWorkerTask.slug, + filePath: backgroundWorkerTask.filePath, + exportName: backgroundWorkerTask.exportName, + }, + attempt: { + id: attempt.friendlyId, + number: attempt.number, + startedAt: attempt.startedAt ?? attempt.createdAt, + backgroundWorkerId: attempt.backgroundWorkerId, + backgroundWorkerTaskId: attempt.backgroundWorkerTaskId, + status: "EXECUTING" as const, + }, + run: { + id: taskRun.friendlyId, + payload: taskRun.payload, + payloadType: taskRun.payloadType, + context: taskRun.context, + createdAt: taskRun.createdAt, + startedAt: taskRun.startedAt ?? taskRun.createdAt, + tags: taskRun.tags.map((tag) => tag.name), + isTest: taskRun.isTest, + idempotencyKey: taskRun.idempotencyKey ?? undefined, + durationMs: taskRun.usageDurationMs, + costInCents: taskRun.costInCents, + baseCostInCents: taskRun.baseCostInCents, + metadata, + maxDuration: taskRun.maxDurationInSeconds ?? undefined, + }, + queue: { + id: queue.friendlyId, + name: queue.name, + }, + environment: { + id: attempt.runtimeEnvironment.id, + slug: attempt.runtimeEnvironment.slug, + type: attempt.runtimeEnvironment.type, + }, + organization: { + id: attempt.runtimeEnvironment.organization.id, + slug: attempt.runtimeEnvironment.organization.slug, + name: attempt.runtimeEnvironment.organization.title, + }, + project: { + id: attempt.runtimeEnvironment.project.id, + ref: attempt.runtimeEnvironment.project.externalRef, + slug: attempt.runtimeEnvironment.project.slug, + name: attempt.runtimeEnvironment.project.name, + }, + batch: + taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun + ? { id: taskRun.batchItems[0].batchTaskRun.friendlyId } + : undefined, + worker: { + id: attempt.backgroundWorkerId, + contentHash: attempt.backgroundWorker.contentHash, + version: attempt.backgroundWorker.version, + }, + machine: machinePreset, + }; + + return execution; + } + async getCompletionPayloadFromAttempt(id: string): Promise { - const attempt = await prisma.taskRunAttempt.findUnique({ + const attempt = await prisma.taskRunAttempt.findFirst({ where: { id, status: { @@ -1030,26 +1181,7 @@ class SharedQueueTasks { return; } - const ok = attempt.status === "COMPLETED"; - - if (ok) { - const success: TaskRunSuccessfulExecutionResult = { - ok, - id: attempt.taskRun.friendlyId, - output: attempt.output ?? undefined, - outputType: attempt.outputType, - taskIdentifier: attempt.taskRun.taskIdentifier, - }; - return success; - } else { - const failure: TaskRunFailedExecutionResult = { - ok, - id: attempt.taskRun.friendlyId, - error: attempt.error as TaskRunError, - taskIdentifier: attempt.taskRun.taskIdentifier, - }; - return failure; - } + return this._completionPayloadFromAttempt(attempt); } async getExecutionPayloadFromAttempt({ @@ -1063,7 +1195,7 @@ class SharedQueueTasks { isRetrying?: boolean; skipStatusChecks?: boolean; }): Promise { - const attempt = await prisma.taskRunAttempt.findUnique({ + const attempt = await prisma.taskRunAttempt.findFirst({ where: { id, }, @@ -1162,78 +1294,10 @@ class SharedQueueTasks { }, }); } - - const { backgroundWorkerTask, taskRun, queue } = attempt; + const { backgroundWorkerTask, taskRun } = attempt; const machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {}); - - const metadata = await parsePacket({ - data: taskRun.metadata ?? undefined, - dataType: taskRun.metadataType, - }); - - const execution: ProdTaskRunExecution = { - task: { - id: backgroundWorkerTask.slug, - filePath: backgroundWorkerTask.filePath, - exportName: backgroundWorkerTask.exportName, - }, - attempt: { - id: attempt.friendlyId, - number: attempt.number, - startedAt: attempt.startedAt ?? attempt.createdAt, - backgroundWorkerId: attempt.backgroundWorkerId, - backgroundWorkerTaskId: attempt.backgroundWorkerTaskId, - status: "EXECUTING" as const, - }, - run: { - id: taskRun.friendlyId, - payload: taskRun.payload, - payloadType: taskRun.payloadType, - context: taskRun.context, - createdAt: taskRun.createdAt, - startedAt: taskRun.startedAt ?? taskRun.createdAt, - tags: taskRun.tags.map((tag) => tag.name), - isTest: taskRun.isTest, - idempotencyKey: taskRun.idempotencyKey ?? undefined, - durationMs: taskRun.usageDurationMs, - costInCents: taskRun.costInCents, - baseCostInCents: taskRun.baseCostInCents, - metadata, - maxDuration: taskRun.maxDurationInSeconds ?? undefined, - }, - queue: { - id: queue.friendlyId, - name: queue.name, - }, - environment: { - id: attempt.runtimeEnvironment.id, - slug: attempt.runtimeEnvironment.slug, - type: attempt.runtimeEnvironment.type, - }, - organization: { - id: attempt.runtimeEnvironment.organization.id, - slug: attempt.runtimeEnvironment.organization.slug, - name: attempt.runtimeEnvironment.organization.title, - }, - project: { - id: attempt.runtimeEnvironment.project.id, - ref: attempt.runtimeEnvironment.project.externalRef, - slug: attempt.runtimeEnvironment.project.slug, - name: attempt.runtimeEnvironment.project.name, - }, - batch: - taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun - ? { id: taskRun.batchItems[0].batchTaskRun.friendlyId } - : undefined, - worker: { - id: attempt.backgroundWorkerId, - contentHash: attempt.backgroundWorker.contentHash, - version: attempt.backgroundWorker.version, - }, - machine: machinePreset, - }; - + const execution = await this._executionFromAttempt(attempt, machinePreset); const variables = await this.#buildEnvironmentVariables( attempt.runtimeEnvironment, taskRun.id, @@ -1252,6 +1316,64 @@ class SharedQueueTasks { return payload; } + async getResumePayload(attemptId: string): Promise< + | { + execution: ProdTaskRunExecution; + completion: TaskRunExecutionResult; + } + | undefined + > { + const attempt = await prisma.taskRunAttempt.findFirst({ + where: { + id: attemptId, + }, + include: { + backgroundWorker: true, + backgroundWorkerTask: true, + runtimeEnvironment: { + include: { + organization: true, + project: true, + }, + }, + taskRun: { + include: { + runtimeEnvironment: { + include: { + organization: true, + project: true, + }, + }, + tags: true, + batchItems: { + include: { + batchTaskRun: { + select: { + friendlyId: true, + }, + }, + }, + }, + }, + }, + queue: true, + }, + }); + + if (!attempt) { + logger.error("getResumePayload: No attempt found", { id: attemptId }); + return; + } + + const execution = await this._executionFromAttempt(attempt); + const completion = this._completionPayloadFromAttempt(attempt); + + return { + execution, + completion, + }; + } + async getLatestExecutionPayloadFromRun( id: string, setToExecuting?: boolean, diff --git a/apps/webapp/app/v3/services/resumeAttempt.server.ts b/apps/webapp/app/v3/services/resumeAttempt.server.ts index 0cba99e472..daf8d96936 100644 --- a/apps/webapp/app/v3/services/resumeAttempt.server.ts +++ b/apps/webapp/app/v3/services/resumeAttempt.server.ts @@ -4,14 +4,13 @@ import { TaskRunExecutionResult, } from "@trigger.dev/core/v3"; import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { logger } from "~/services/logger.server"; import { marqs } from "~/v3/marqs/index.server"; import { socketIo } from "../handleSocketIo.server"; import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server"; import { BaseService } from "./baseService.server"; -import { TaskRunAttempt } from "@trigger.dev/database"; -import { isFinalRunStatus } from "../taskStatus"; +import { Prisma, TaskRunAttempt } from "@trigger.dev/database"; +import { FINAL_ATTEMPT_STATUSES, FINAL_RUN_STATUSES, isFinalRunStatus } from "../taskStatus"; export class ResumeAttemptService extends BaseService { private _logger = logger; @@ -21,145 +20,133 @@ export class ResumeAttemptService extends BaseService { ): Promise { this._logger.debug(`ResumeAttemptService.call()`, params); - await $transaction(this._prisma, async (tx) => { - const attempt = await tx.taskRunAttempt.findUnique({ - where: { - friendlyId: params.attemptFriendlyId, - }, - include: { - taskRun: true, - dependencies: { - include: { - taskRun: { - include: { - attempts: { - orderBy: { - number: "desc", - }, - take: 1, - select: { - id: true, - }, - }, - }, + const latestAttemptSelect = { + orderBy: { + number: "desc", + }, + take: 1, + select: { + id: true, + }, + } satisfies Prisma.TaskRunInclude["attempts"]; + + const attempt = await this._prisma.taskRunAttempt.findFirst({ + where: { + friendlyId: params.attemptFriendlyId, + }, + include: { + taskRun: true, + dependencies: { + include: { + taskRun: { + include: { + attempts: latestAttemptSelect, }, }, - orderBy: { - createdAt: "desc", - }, - take: 1, }, - batchDependencies: { - include: { - items: { - include: { - taskRun: { - include: { - attempts: { - orderBy: { - number: "desc", - }, - take: 1, - select: { - id: true, - }, - }, - }, + orderBy: { + createdAt: "desc", + }, + take: 1, + }, + batchDependencies: { + include: { + items: { + include: { + taskRun: { + include: { + attempts: latestAttemptSelect, }, }, }, }, - orderBy: { - createdAt: "desc", - }, - take: 1, }, + orderBy: { + createdAt: "desc", + }, + take: 1, }, - }); + }, + }); - if (!attempt) { - this._logger.error("Could not find attempt", params); - return; - } + if (!attempt) { + this._logger.error("Could not find attempt", params); + return; + } - this._logger = logger.child({ - attemptId: attempt.id, - attemptFriendlyId: attempt.friendlyId, - taskRun: attempt.taskRun, - }); + this._logger = logger.child({ + attemptId: attempt.id, + attemptFriendlyId: attempt.friendlyId, + taskRun: attempt.taskRun, + }); - if (isFinalRunStatus(attempt.taskRun.status)) { - this._logger.error("Run is not resumable"); - return; - } + if (isFinalRunStatus(attempt.taskRun.status)) { + this._logger.error("Run is not resumable"); + return; + } - let completedAttemptIds: string[] = []; + let completedAttemptIds: string[] = []; - switch (params.type) { - case "WAIT_FOR_DURATION": { - this._logger.debug("Sending duration wait resume message"); + switch (params.type) { + case "WAIT_FOR_DURATION": { + this._logger.debug("Sending duration wait resume message"); - await this.#setPostResumeStatuses(attempt, tx); + await this.#setPostResumeStatuses(attempt); - socketIo.coordinatorNamespace.emit("RESUME_AFTER_DURATION", { - version: "v1", - attemptId: attempt.id, - attemptFriendlyId: attempt.friendlyId, - }); - break; - } - case "WAIT_FOR_TASK": { - if (attempt.dependencies.length) { - // We only care about the latest dependency - const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0]; - - if (!dependentAttempt) { - this._logger.error("No dependent attempt"); - return; - } - - completedAttemptIds = [dependentAttempt.id]; - } else { - this._logger.error("No task dependency"); + socketIo.coordinatorNamespace.emit("RESUME_AFTER_DURATION", { + version: "v1", + attemptId: attempt.id, + attemptFriendlyId: attempt.friendlyId, + }); + break; + } + case "WAIT_FOR_TASK": { + if (attempt.dependencies.length) { + // We only care about the latest dependency + const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0]; + + if (!dependentAttempt) { + this._logger.error("No dependent attempt"); return; } - await this.#handleDependencyResume(attempt, completedAttemptIds, tx); - - break; + completedAttemptIds = [dependentAttempt.id]; + } else { + this._logger.error("No task dependency"); + return; } - case "WAIT_FOR_BATCH": { - if (attempt.batchDependencies) { - // We only care about the latest batch dependency - const dependentBatchItems = attempt.batchDependencies[0].items; - - if (!dependentBatchItems) { - this._logger.error("No dependent batch items"); - return; - } - - completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id); - } else { - this._logger.error("No batch dependency"); + + await this.#handleDependencyResume(attempt, completedAttemptIds); + + break; + } + case "WAIT_FOR_BATCH": { + if (attempt.batchDependencies) { + // We only care about the latest batch dependency + const dependentBatchItems = attempt.batchDependencies[0].items; + + if (!dependentBatchItems) { + this._logger.error("No dependent batch items"); return; } - await this.#handleDependencyResume(attempt, completedAttemptIds, tx); - - break; - } - default: { - break; + completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id); + } else { + this._logger.error("No batch dependency"); + return; } + + await this.#handleDependencyResume(attempt, completedAttemptIds); + + break; } - }); + default: { + break; + } + } } - async #handleDependencyResume( - attempt: TaskRunAttempt, - completedAttemptIds: string[], - tx: PrismaClientOrTransaction - ) { + async #handleDependencyResume(attempt: TaskRunAttempt, completedAttemptIds: string[]) { if (completedAttemptIds.length === 0) { this._logger.error("No completed attempt IDs"); return; @@ -169,7 +156,7 @@ export class ResumeAttemptService extends BaseService { const executions: TaskRunExecution[] = []; for (const completedAttemptId of completedAttemptIds) { - const completedAttempt = await tx.taskRunAttempt.findUnique({ + const completedAttempt = await this._prisma.taskRunAttempt.findFirst({ where: { id: completedAttemptId, taskRun: { @@ -195,33 +182,19 @@ export class ResumeAttemptService extends BaseService { completedRunId: completedAttempt.taskRunId, }); - const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt( - completedAttempt.id - ); + const resumePayload = await sharedQueueTasks.getResumePayload(completedAttempt.id); - if (!completion) { - logger.error("Failed to get completion payload"); + if (!resumePayload) { + logger.error("Failed to get resume payload"); await marqs?.acknowledgeMessage(attempt.taskRunId); return; } - completions.push(completion); - - const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({ - id: completedAttempt.id, - skipStatusChecks: true, // already checked when getting the completion - }); - - if (!executionPayload) { - logger.error("Failed to get execution payload"); - await marqs?.acknowledgeMessage(attempt.taskRunId); - return; - } - - executions.push(executionPayload.execution); + completions.push(resumePayload.completion); + executions.push(resumePayload.execution); } - await this.#setPostResumeStatuses(attempt, tx); + await this.#setPostResumeStatuses(attempt); socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", { version: "v1", @@ -233,21 +206,55 @@ export class ResumeAttemptService extends BaseService { }); } - async #setPostResumeStatuses(attempt: TaskRunAttempt, tx: PrismaClientOrTransaction) { - return await tx.taskRunAttempt.update({ - where: { - id: attempt.id, - }, - data: { - status: "EXECUTING", - taskRun: { - update: { - data: { - status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING", + async #setPostResumeStatuses(attempt: TaskRunAttempt) { + try { + const updatedAttempt = await this._prisma.taskRunAttempt.update({ + where: { + id: attempt.id, + }, + data: { + status: "EXECUTING", + taskRun: { + update: { + data: { + status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING", + }, }, }, }, - }, - }); + select: { + id: true, + status: true, + taskRun: { + select: { + id: true, + status: true, + }, + }, + }, + }); + + this._logger.debug("Set post resume statuses", { + run: { + id: updatedAttempt.taskRun.id, + status: updatedAttempt.taskRun.status, + }, + attempt: { + id: updatedAttempt.id, + status: updatedAttempt.status, + }, + }); + } catch (error) { + this._logger.error("Failed to set post resume statuses", { + error: + error instanceof Error + ? { + name: error.name, + message: error.message, + stack: error.stack, + } + : error, + }); + } } } diff --git a/references/v3-catalog/src/trigger/batch.ts b/references/v3-catalog/src/trigger/batch.ts index d85f5960f8..2ba5754f64 100644 --- a/references/v3-catalog/src/trigger/batch.ts +++ b/references/v3-catalog/src/trigger/batch.ts @@ -16,8 +16,8 @@ import { setTimeout } from "node:timers/promises"; export const batchParentTask = task({ id: "batch-parent-task", - run: async () => { - const items = Array.from({ length: 10 }, (_, i) => ({ + run: async (payload: { size?: number; wait?: boolean }) => { + const items = Array.from({ length: payload.size ?? 10 }, (_, i) => ({ payload: { id: `item${i}`, name: `Item Name ${i}`, @@ -44,7 +44,9 @@ export const batchParentTask = task({ }, })); - return await batchChildTask.batchTrigger(items); + return payload.wait + ? await batchChildTask.batchTriggerAndWait(items.map((i) => ({ payload: i.payload }))) + : await batchChildTask.batchTrigger(items); }, });