diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 7051fb7367..3105c36d04 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -362,6 +362,7 @@ const EnvironmentSchema = z.object({ MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(), MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(), MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500), + MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500), REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"), BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), diff --git a/apps/webapp/app/presenters/v3/BatchListPresenter.server.ts b/apps/webapp/app/presenters/v3/BatchListPresenter.server.ts index f3d25269b9..2317f68a14 100644 --- a/apps/webapp/app/presenters/v3/BatchListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/BatchListPresenter.server.ts @@ -100,6 +100,7 @@ export class BatchListPresenter extends BasePresenter { status: BatchTaskRunStatus; createdAt: Date; updatedAt: Date; + completedAt: Date | null; runCount: BigInt; batchVersion: string; }[] @@ -111,6 +112,7 @@ export class BatchListPresenter extends BasePresenter { b.status, b."createdAt", b."updatedAt", + b."completedAt", b."runCount", b."batchVersion" FROM @@ -196,7 +198,11 @@ WHERE createdAt: batch.createdAt.toISOString(), updatedAt: batch.updatedAt.toISOString(), hasFinished, - finishedAt: hasFinished ? batch.updatedAt.toISOString() : undefined, + finishedAt: batch.completedAt + ? batch.completedAt.toISOString() + : hasFinished + ? batch.updatedAt.toISOString() + : undefined, status: batch.status, environment: displayableEnvironment(environment, userId), runCount: Number(batch.runCount), diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index e6e3398e69..a557243442 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute( const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL); - const run = await service.call(params.taskId, authentication.environment, body, { + const result = await service.call(params.taskId, authentication.environment, body, { idempotencyKey: idempotencyKey ?? undefined, idempotencyKeyExpiresAt: idempotencyKeyExpiresAt, triggerVersion: triggerVersion ?? undefined, @@ -83,19 +83,20 @@ const { action, loader } = createActionApiRoute( oneTimeUseToken, }); - if (!run) { + if (!result) { return json({ error: "Task not found" }, { status: 404 }); } const $responseHeaders = await responseHeaders( - run, + result.run, authentication.environment, triggerClient ); return json( { - id: run.friendlyId, + id: result.run.friendlyId, + isCached: result.isCached, }, { headers: $responseHeaders, diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 47f603bcc6..5504427986 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -12,8 +12,8 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy, - BatchTriggerV2Service, -} from "~/v3/services/batchTriggerV2.server"; + BatchTriggerV3Service, +} from "~/v3/services/batchTriggerV3.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; @@ -40,13 +40,24 @@ const { action, loader } = createActionApiRoute( } // Check the there are fewer than MAX_BATCH_V2_TRIGGER_ITEMS items - if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) { - return json( - { - error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`, - }, - { status: 400 } - ); + if (body.dependentAttempt) { + if (body.items.length > env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS) { + return json( + { + error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS} when batchTriggerAndWait.`, + }, + { status: 400 } + ); + } + } else { + if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) { + return json( + { + error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`, + }, + { status: 400 } + ); + } } const { @@ -85,7 +96,7 @@ const { action, loader } = createActionApiRoute( resolveIdempotencyKeyTTL(idempotencyKeyTTL) ?? new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); - const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined); + const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { @@ -118,7 +129,7 @@ const { action, loader } = createActionApiRoute( return json({ error: error.message }, { status: 422 }); } else if (error instanceof Error) { return json( - { error: error.message }, + { error: "Something went wrong" }, { status: 500, headers: { "x-should-retry": "false" } } ); } diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index c2409cf8c5..6c86b37f47 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -55,7 +55,7 @@ import { CancelDevSessionRunsServiceOptions, } from "~/v3/services/cancelDevSessionRuns.server"; import { logger } from "./logger.server"; -import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server"; +import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; const workerCatalog = { indexEndpoint: z.object({ @@ -733,7 +733,7 @@ function getWorkerQueue() { priority: 0, maxAttempts: 5, handler: async (payload, job) => { - const service = new BatchTriggerV2Service(payload.strategy); + const service = new BatchTriggerV3Service(payload.strategy); await service.processBatchTaskRun(payload); }, diff --git a/apps/webapp/app/v3/services/batchTriggerTask.server.ts b/apps/webapp/app/v3/services/batchTriggerTask.server.ts index 5dc1742e69..a7bf1846f4 100644 --- a/apps/webapp/app/v3/services/batchTriggerTask.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerTask.server.ts @@ -104,7 +104,7 @@ export class BatchTriggerTaskService extends BaseService { for (const item of body.items) { try { - const run = await triggerTaskService.call( + const result = await triggerTaskService.call( taskId, environment, { @@ -123,16 +123,16 @@ export class BatchTriggerTaskService extends BaseService { } ); - if (run) { + if (result) { await this._prisma.batchTaskRunItem.create({ data: { batchTaskRunId: batch.id, - taskRunId: run.id, - status: batchTaskRunItemStatusForRunStatus(run.status), + taskRunId: result.run.id, + status: batchTaskRunItemStatusForRunStatus(result.run.status), }, }); - runs.push(run.friendlyId); + runs.push(result.run.friendlyId); } index++; diff --git a/apps/webapp/app/v3/services/batchTriggerV2.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts similarity index 62% rename from apps/webapp/app/v3/services/batchTriggerV2.server.ts rename to apps/webapp/app/v3/services/batchTriggerV3.server.ts index 02a0cb768d..197707dfa0 100644 --- a/apps/webapp/app/v3/services/batchTriggerV2.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -5,7 +5,13 @@ import { packetRequiresOffloading, parsePacket, } from "@trigger.dev/core/v3"; -import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database"; +import { + BatchTaskRun, + isUniqueConstraintError, + Prisma, + TaskRunAttempt, +} from "@trigger.dev/database"; +import { z } from "zod"; import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server"; import { env } from "~/env.server"; import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server"; @@ -20,8 +26,8 @@ import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2. import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { startActiveSpan } from "../tracer.server"; import { BaseService, ServiceValidationError } from "./baseService.server"; +import { ResumeBatchRunService } from "./resumeBatchRun.server"; import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; -import { z } from "zod"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; @@ -49,7 +55,41 @@ export type BatchTriggerTaskServiceOptions = { oneTimeUseToken?: string; }; -export class BatchTriggerV2Service extends BaseService { +type RunItemData = { + id: string; + isCached: boolean; + idempotencyKey: string | undefined; + taskIdentifier: string; +}; + +/** + * ### V3 + * + * BatchTrigger v3 doesn't have any changes from v2, other than a different system for tracking if the + * batch is completed. + * + * v3 BatchTaskRun's now must be "sealed" before they could be considered completed. Being "sealed" means + * that all the items in the batch have been processed and the batch is ready to be considered completed. + * + * We also now track the expected count of items in the batch, and then as each BatchTaskRunItem is set to COMPLETED, + * we increment the BatchTaskRun's completed count. Once the completed count is equal to the expected count, and the + * batch is sealed, we can consider the batch completed. + * + * So now when the v3 batch is considered completed, we will enqueue the ResumeBatchRunService to resume the dependent + * task attempt if there is one. This is in contrast to v2 batches where every time a task was completed, we would schedule + * the ResumeBatchRunService to check if the batch was completed and set it to completed if it was. + * + * We've also introduced a new column "resumedAt" that will be set when the batch is resumed. Previously in v2 batches, the status == "COMPLETED" was overloaded + * to mean that the batch was completed and resumed. Now we have a separate column to track when the batch was resumed (and to make sure it's only resumed once). + * + * ### V2 + * + * Batch v2 added the ability to trigger more than 100 tasks in a single batch. This was done by offloading the payload to the object store and + * then processing the batch in chunks of 50 tasks at a time in the background. + * + * The other main difference from v1 is that a single batch in v2 could trigger multiple different tasks, whereas in v1 a batch could only trigger a single task. + */ +export class BatchTriggerV3Service extends BaseService { private _batchProcessingStrategy: BatchProcessingStrategy; constructor( @@ -71,13 +111,15 @@ export class BatchTriggerV2Service extends BaseService { "call()", environment, async (span) => { + if (!body.items || body.items.length === 0) { + throw new ServiceValidationError("A batch trigger must have at least one item"); + } + const existingBatch = options.idempotencyKey - ? await this._prisma.batchTaskRun.findUnique({ + ? await this._prisma.batchTaskRun.findFirst({ where: { - runtimeEnvironmentId_idempotencyKey: { - runtimeEnvironmentId: environment.id, - idempotencyKey: options.idempotencyKey, - }, + runtimeEnvironmentId: environment.id, + idempotencyKey: options.idempotencyKey, }, }) : undefined; @@ -152,79 +194,10 @@ export class BatchTriggerV2Service extends BaseService { } } - const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); - - const cachedRuns = - idempotencyKeys.length > 0 - ? await this._prisma.taskRun.findMany({ - where: { - runtimeEnvironmentId: environment.id, - idempotencyKey: { - in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), - }, - }, - select: { - friendlyId: true, - idempotencyKey: true, - idempotencyKeyExpiresAt: true, - }, - }) - : []; - - if (cachedRuns.length) { - logger.debug("[BatchTriggerV2][call] Found cached runs", { - cachedRuns, - batchId, - }); - } - - // Now we need to create an array of all the run IDs, in order - // If we have a cached run, that isn't expired, we should use that run ID - // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs - // If we don't have a cached run, we should generate a new run ID - const expiredRunIds = new Set(); - let cachedRunCount = 0; - - const runs = body.items.map((item) => { - const cachedRun = cachedRuns.find( - (r) => r.idempotencyKey === item.options?.idempotencyKey - ); - - if (cachedRun) { - if ( - cachedRun.idempotencyKeyExpiresAt && - cachedRun.idempotencyKeyExpiresAt < new Date() - ) { - expiredRunIds.add(cachedRun.friendlyId); - - return { - id: generateFriendlyId("run"), - isCached: false, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - } - - cachedRunCount++; - - return { - id: cachedRun.friendlyId, - isCached: true, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - } - - return { - id: generateFriendlyId("run"), - isCached: false, - idempotencyKey: item.options?.idempotencyKey ?? undefined, - taskIdentifier: item.task, - }; - }); + const runs = await this.#prepareRunData(environment, body); // Calculate how many new runs we need to create - const newRunCount = body.items.length - cachedRunCount; + const newRunCount = runs.filter((r) => !r.isCached).length; if (newRunCount === 0) { logger.debug("[BatchTriggerV2][call] All runs are cached", { @@ -241,7 +214,7 @@ export class BatchTriggerV2Service extends BaseService { runCount: body.items.length, runIds: runs.map((r) => r.id), status: "COMPLETED", - batchVersion: "v2", + batchVersion: "v3", oneTimeUseToken: options.oneTimeUseToken, }, }); @@ -273,20 +246,6 @@ export class BatchTriggerV2Service extends BaseService { ); } - // Expire the cached runs that are no longer valid - if (expiredRunIds.size) { - logger.debug("Expiring cached runs", { - expiredRunIds: Array.from(expiredRunIds), - batchId, - }); - - // TODO: is there a limit to the number of items we can update in a single query? - await this._prisma.taskRun.updateMany({ - where: { friendlyId: { in: Array.from(expiredRunIds) } }, - data: { idempotencyKey: null }, - }); - } - // Upload to object store const payloadPacket = await this.#handlePayloadPacket( body.items, @@ -350,14 +309,90 @@ export class BatchTriggerV2Service extends BaseService { } } + async #prepareRunData( + environment: AuthenticatedEnvironment, + body: BatchTriggerTaskV2RequestBody + ): Promise> { + // batchTriggerAndWait cannot have cached runs because that does not work in run engine v1 and is not available in the client + if (body?.dependentAttempt) { + return body.items.map((item) => ({ + id: generateFriendlyId("run"), + isCached: false, + idempotencyKey: undefined, + taskIdentifier: item.task, + })); + } + + const idempotencyKeys = body.items.map((i) => i.options?.idempotencyKey).filter(Boolean); + + const cachedRuns = + idempotencyKeys.length > 0 + ? await this._prisma.taskRun.findMany({ + where: { + runtimeEnvironmentId: environment.id, + idempotencyKey: { + in: body.items.map((i) => i.options?.idempotencyKey).filter(Boolean), + }, + }, + select: { + friendlyId: true, + idempotencyKey: true, + idempotencyKeyExpiresAt: true, + }, + }) + : []; + + // Now we need to create an array of all the run IDs, in order + // If we have a cached run, that isn't expired, we should use that run ID + // If we have a cached run, that is expired, we should generate a new run ID and save that cached run ID to a set of expired run IDs + // If we don't have a cached run, we should generate a new run ID + const expiredRunIds = new Set(); + + const runs = body.items.map((item) => { + const cachedRun = cachedRuns.find((r) => r.idempotencyKey === item.options?.idempotencyKey); + + if (cachedRun) { + if (cachedRun.idempotencyKeyExpiresAt && cachedRun.idempotencyKeyExpiresAt < new Date()) { + expiredRunIds.add(cachedRun.friendlyId); + + return { + id: generateFriendlyId("run"), + isCached: false, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + } + + return { + id: cachedRun.friendlyId, + isCached: true, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + } + + return { + id: generateFriendlyId("run"), + isCached: false, + idempotencyKey: item.options?.idempotencyKey ?? undefined, + taskIdentifier: item.task, + }; + }); + + // Expire the cached runs that are no longer valid + if (expiredRunIds.size) { + await this._prisma.taskRun.updateMany({ + where: { friendlyId: { in: Array.from(expiredRunIds) } }, + data: { idempotencyKey: null }, + }); + } + + return runs; + } + async #createAndProcessBatchTaskRun( batchId: string, - runs: Array<{ - id: string; - isCached: boolean; - idempotencyKey: string | undefined; - taskIdentifier: string; - }>, + runs: Array, payloadPacket: IOPacket, newRunCount: number, environment: AuthenticatedEnvironment, @@ -365,7 +400,7 @@ export class BatchTriggerV2Service extends BaseService { options: BatchTriggerTaskServiceOptions = {}, dependentAttempt?: TaskRunAttempt ) { - if (newRunCount <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { + if (runs.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) { const batch = await this._prisma.batchTaskRun.create({ data: { friendlyId: batchId, @@ -373,12 +408,12 @@ export class BatchTriggerV2Service extends BaseService { idempotencyKey: options.idempotencyKey, idempotencyKeyExpiresAt: options.idempotencyKeyExpiresAt, dependentTaskAttemptId: dependentAttempt?.id, - runCount: newRunCount, + runCount: runs.length, runIds: runs.map((r) => r.id), payload: payloadPacket.data, payloadType: payloadPacket.dataType, options, - batchVersion: "v2", + batchVersion: "v3", oneTimeUseToken: options.oneTimeUseToken, }, }); @@ -392,56 +427,34 @@ export class BatchTriggerV2Service extends BaseService { options ); - switch (result.status) { - case "COMPLETE": { - logger.debug("[BatchTriggerV2][call] Batch inline processing complete", { - batchId: batch.friendlyId, - currentIndex: 0, - }); - - return batch; - } - case "INCOMPLETE": { - logger.debug("[BatchTriggerV2][call] Batch inline processing incomplete", { - batchId: batch.friendlyId, - currentIndex: result.workingIndex, - }); + if (result.error) { + logger.error("[BatchTriggerV2][call] Batch inline processing error", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + error: result.error, + }); - // If processing inline does not finish for some reason, enqueue processing the rest of the batch - await this.#enqueueBatchTaskRun({ - batchId: batch.id, - processingId: "0", - range: { - start: result.workingIndex, - count: PROCESSING_BATCH_SIZE, - }, - attemptCount: 0, - strategy: "sequential", - }); + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: "0", + range: { + start: result.workingIndex, + count: PROCESSING_BATCH_SIZE, + }, + attemptCount: 0, + strategy: "sequential", + }); - return batch; - } - case "ERROR": { - logger.error("[BatchTriggerV2][call] Batch inline processing error", { - batchId: batch.friendlyId, - currentIndex: result.workingIndex, - error: result.error, - }); + return batch; + } - await this.#enqueueBatchTaskRun({ - batchId: batch.id, - processingId: "0", - range: { - start: result.workingIndex, - count: PROCESSING_BATCH_SIZE, - }, - attemptCount: 0, - strategy: "sequential", - }); + // Update the batch to be sealed + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { sealed: true, sealedAt: new Date() }, + }); - return batch; - } - } + return batch; } else { return await $transaction(this._prisma, async (tx) => { const batch = await tx.batchTaskRun.create({ @@ -456,20 +469,23 @@ export class BatchTriggerV2Service extends BaseService { payload: payloadPacket.data, payloadType: payloadPacket.dataType, options, - batchVersion: "v2", + batchVersion: "v3", oneTimeUseToken: options.oneTimeUseToken, }, }); switch (this._batchProcessingStrategy) { case "sequential": { - await this.#enqueueBatchTaskRun({ - batchId: batch.id, - processingId: batchId, - range: { start: 0, count: PROCESSING_BATCH_SIZE }, - attemptCount: 0, - strategy: this._batchProcessingStrategy, - }); + await this.#enqueueBatchTaskRun( + { + batchId: batch.id, + processingId: batchId, + range: { start: 0, count: PROCESSING_BATCH_SIZE }, + attemptCount: 0, + strategy: this._batchProcessingStrategy, + }, + tx + ); break; } @@ -481,6 +497,13 @@ export class BatchTriggerV2Service extends BaseService { count: PROCESSING_BATCH_SIZE, })); + await tx.batchTaskRun.update({ + where: { id: batch.id }, + data: { + processingJobsExpectedCount: ranges.length, + }, + }); + await Promise.all( ranges.map((range, index) => this.#enqueueBatchTaskRun( @@ -620,26 +643,51 @@ export class BatchTriggerV2Service extends BaseService { $options ); - switch (result.status) { - case "COMPLETE": { - logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing complete", { - options, - batchId: batch.friendlyId, - attemptCount: $attemptCount, - }); + if (result.error) { + logger.error("[BatchTriggerV2][processBatchTaskRun] Batch processing error", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, + error: result.error, + attemptCount: $attemptCount, + }); - return; - } - case "INCOMPLETE": { - logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing incomplete", { - batchId: batch.friendlyId, - currentIndex: result.workingIndex, - attemptCount: $attemptCount, - }); + // if the strategy is sequential, we will requeue processing with a count of the PROCESSING_BATCH_SIZE + // if the strategy is parallel, we will requeue processing with a range starting at the workingIndex and a count that is the remainder of this "slice" of the batch + await this.#enqueueBatchTaskRun({ + batchId: batch.id, + processingId: options.processingId, + range: { + start: result.workingIndex, + count: + options.strategy === "sequential" + ? options.range.count + : options.range.count - result.workingIndex - options.range.start, + }, + attemptCount: $attemptCount, + strategy: options.strategy, + }); - // Only enqueue the next batch task run if the strategy is sequential - // if the strategy is parallel, we will already have enqueued the next batch task run - if (options.strategy === "sequential") { + return; + } + + switch (options.strategy) { + case "sequential": { + // We can tell if we are done by checking if the result.workingIndex is equal or greater than the runCount + if (result.workingIndex >= batch.runCount) { + // Update the batch to be sealed + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { sealed: true, sealedAt: new Date() }, + }); + + logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing complete", { + batchId: batch.friendlyId, + runCount: batch.runCount, + currentIndex: result.workingIndex, + attemptCount: $attemptCount, + }); + } else { + // Requeue the next batch of processing await this.#enqueueBatchTaskRun({ batchId: batch.id, processingId: options.processingId, @@ -652,46 +700,37 @@ export class BatchTriggerV2Service extends BaseService { }); } - return; + break; } - case "ERROR": { - logger.error("[BatchTriggerV2][processBatchTaskRun] Batch processing error", { - batchId: batch.friendlyId, - currentIndex: result.workingIndex, - error: result.error, - attemptCount: $attemptCount, - }); - - // if the strategy is sequential, we will requeue processing with a count of the PROCESSING_BATCH_SIZE - // if the strategy is parallel, we will requeue processing with a range starting at the workingIndex and a count that is the remainder of this "slice" of the batch - if (options.strategy === "sequential") { - await this.#enqueueBatchTaskRun({ - batchId: batch.id, - processingId: options.processingId, - range: { - start: result.workingIndex, - count: options.range.count, // This will be the same as the original count + case "parallel": { + // We need to increment the processingJobsCount and check if we are done + const { processingJobsCount, processingJobsExpectedCount } = + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { + processingJobsCount: { + increment: 1, + }, }, - attemptCount: $attemptCount, - strategy: options.strategy, - }); - } else { - await this.#enqueueBatchTaskRun({ - batchId: batch.id, - processingId: options.processingId, - range: { - start: result.workingIndex, - // This will be the remainder of the slice - // for example if the original range was 0-50 and the workingIndex is 25, the new range will be 25-25 - // if the original range was 51-100 and the workingIndex is 75, the new range will be 75-25 - count: options.range.count - result.workingIndex - options.range.start, + select: { + processingJobsExpectedCount: true, + processingJobsCount: true, }, + }); + + if (processingJobsCount >= processingJobsExpectedCount) { + // Update the batch to be sealed + await this._prisma.batchTaskRun.update({ + where: { id: batch.id }, + data: { sealed: true, sealedAt: new Date() }, + }); + + logger.debug("[BatchTriggerV2][processBatchTaskRun] Batch processing complete", { + batchId: batch.friendlyId, + currentIndex: result.workingIndex, attemptCount: $attemptCount, - strategy: options.strategy, }); } - - return; } } } @@ -703,11 +742,7 @@ export class BatchTriggerV2Service extends BaseService { batchSize: number, items: BatchTriggerTaskV2RequestBody["items"], options?: BatchTriggerTaskServiceOptions - ): Promise< - | { status: "COMPLETE" } - | { status: "INCOMPLETE"; workingIndex: number } - | { status: "ERROR"; error: string; workingIndex: number } - > { + ): Promise<{ workingIndex: number; error?: Error }> { // Grab the next PROCESSING_BATCH_SIZE runIds const runIds = batch.runIds.slice(currentIndex, currentIndex + batchSize); @@ -739,19 +774,13 @@ export class BatchTriggerV2Service extends BaseService { }); return { - status: "ERROR", - error: error instanceof Error ? error.message : String(error), + error: error instanceof Error ? error : new Error(String(error)), workingIndex, }; } } - // if there are more items to process, requeue the batch - if (workingIndex < batch.runCount) { - return { status: "INCOMPLETE", workingIndex }; - } - - return { status: "COMPLETE" }; + return { workingIndex }; } async #processBatchTaskRunItem( @@ -769,7 +798,7 @@ export class BatchTriggerV2Service extends BaseService { const triggerTaskService = new TriggerTaskService(); - const run = await triggerTaskService.call( + const result = await triggerTaskService.call( task.item.task, environment, { @@ -790,17 +819,45 @@ export class BatchTriggerV2Service extends BaseService { } ); - if (!run) { + if (!result) { throw new Error(`Failed to trigger run ${task.runId} for batch ${batch.friendlyId}`); } - await this._prisma.batchTaskRunItem.create({ - data: { - batchTaskRunId: batch.id, - taskRunId: run.id, - status: batchTaskRunItemStatusForRunStatus(run.status), - }, - }); + if (!result.isCached) { + try { + await $transaction(this._prisma, async (tx) => { + // [batchTaskRunId, taskRunId] is a unique index + await tx.batchTaskRunItem.create({ + data: { + batchTaskRunId: batch.id, + taskRunId: result.run.id, + status: batchTaskRunItemStatusForRunStatus(result.run.status), + }, + }); + + await tx.batchTaskRun.update({ + where: { id: batch.id }, + data: { expectedCount: { increment: 1 } }, + }); + }); + } catch (error) { + if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) { + // This means there is already a batchTaskRunItem for this batch and taskRun + logger.debug( + "[BatchTriggerV2][processBatchTaskRunItem] BatchTaskRunItem already exists", + { + batchId: batch.friendlyId, + runId: task.runId, + currentIndex, + } + ); + + return; + } + + throw error; + } + } } async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { @@ -842,3 +899,66 @@ export class BatchTriggerV2Service extends BaseService { }); } } + +export async function completeBatchTaskRunItemV3( + itemId: string, + batchTaskRunId: string, + tx: PrismaClientOrTransaction, + scheduleResumeOnComplete = false +) { + await $transaction(tx, async (tx) => { + // Update the item to complete + const updated = await tx.batchTaskRunItem.updateMany({ + where: { + id: itemId, + status: "PENDING", + }, + data: { + status: "COMPLETED", + }, + }); + + if (updated.count === 0) { + return; + } + + const updatedBatchRun = await tx.batchTaskRun.update({ + where: { + id: batchTaskRunId, + }, + data: { + completedCount: { + increment: 1, + }, + }, + select: { + sealed: true, + status: true, + completedCount: true, + expectedCount: true, + dependentTaskAttemptId: true, + }, + }); + + if ( + updatedBatchRun.status === "PENDING" && + updatedBatchRun.completedCount === updatedBatchRun.expectedCount && + updatedBatchRun.sealed + ) { + await tx.batchTaskRun.update({ + where: { + id: batchTaskRunId, + }, + data: { + status: "COMPLETED", + completedAt: new Date(), + }, + }); + + // We only need to resume the batch if it has a dependent task attempt ID + if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) { + await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx); + } + } + }); +} diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index d1b80e6e75..39c62f8858 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -94,7 +94,12 @@ export class CreateCheckpointService extends BaseService { } //sleep to test slow checkpoints - // await new Promise((resolve) => setTimeout(resolve, 60_000)); + // Sleep a random value between 4 and 30 seconds + // await new Promise((resolve) => { + // const waitSeconds = Math.floor(Math.random() * 26) + 4; + // logger.log(`Sleep for ${waitSeconds} seconds`); + // setTimeout(resolve, waitSeconds * 1000); + // }); const checkpoint = await this._prisma.checkpoint.create({ data: { @@ -246,6 +251,7 @@ export class CreateCheckpointService extends BaseService { const batchRun = await this._prisma.batchTaskRun.findFirst({ select: { id: true, + batchVersion: true, }, where: { friendlyId: reason.batchFriendlyId, @@ -276,7 +282,11 @@ export class CreateCheckpointService extends BaseService { true ); - await ResumeBatchRunService.enqueue(batchRun.id, this._prisma); + await ResumeBatchRunService.enqueue( + batchRun.id, + batchRun.batchVersion === "v3", + this._prisma + ); return { success: true, diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index f4b5df5f78..fd34cec64a 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -1,8 +1,12 @@ import { FlushedRunMetadata, sanitizeError, TaskRunError } from "@trigger.dev/core/v3"; import { type Prisma, type TaskRun } from "@trigger.dev/database"; +import { findQueueInEnvironment } from "~/models/taskQueue.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; +import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; import { marqs } from "~/v3/marqs/index.server"; import { generateFriendlyId } from "../friendlyIdentifiers"; +import { socketIo } from "../handleSocketIo.server"; import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, @@ -11,13 +15,10 @@ import { } from "../taskStatus"; import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server"; import { BaseService } from "./baseService.server"; -import { ResumeDependentParentsService } from "./resumeDependentParents.server"; +import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; -import { socketIo } from "../handleSocketIo.server"; import { ResumeBatchRunService } from "./resumeBatchRun.server"; -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; -import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; -import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server"; +import { ResumeDependentParentsService } from "./resumeDependentParents.server"; type BaseInput = { id: string; @@ -196,6 +197,7 @@ export class FinalizeTaskRunService extends BaseService { select: { id: true, dependentTaskAttemptId: true, + batchVersion: true, }, }, }, @@ -216,23 +218,29 @@ export class FinalizeTaskRunService extends BaseService { for (const item of batchItems) { // Don't do anything if this is a batchTriggerAndWait in a deployed task + // As that is being handled in resumeDependentParents and resumeTaskRunDependencies if (environment.type !== "DEVELOPMENT" && item.batchTaskRun.dependentTaskAttemptId) { continue; } - // Update the item to complete - await this._prisma.batchTaskRunItem.update({ - where: { - id: item.id, - }, - data: { - status: "COMPLETED", - }, - }); + if (item.batchTaskRun.batchVersion === "v3") { + await completeBatchTaskRunItemV3(item.id, item.batchTaskRunId, this._prisma); + } else { + // THIS IS DEPRECATED and only happens with batchVersion != v3 + // Update the item to complete + await this._prisma.batchTaskRunItem.update({ + where: { + id: item.id, + }, + data: { + status: "COMPLETED", + }, + }); - // This won't resume because this batch does not have a dependent task attempt ID - // or is in development, but this service will mark the batch as completed - await ResumeBatchRunService.enqueue(item.batchTaskRunId, this._prisma); + // This won't resume because this batch does not have a dependent task attempt ID + // or is in development, but this service will mark the batch as completed + await ResumeBatchRunService.enqueue(item.batchTaskRunId, false, this._prisma); + } } } diff --git a/apps/webapp/app/v3/services/replayTaskRun.server.ts b/apps/webapp/app/v3/services/replayTaskRun.server.ts index cb97c07995..26058e1e6a 100644 --- a/apps/webapp/app/v3/services/replayTaskRun.server.ts +++ b/apps/webapp/app/v3/services/replayTaskRun.server.ts @@ -87,7 +87,7 @@ export class ReplayTaskRunService extends BaseService { }); const triggerTaskService = new TriggerTaskService(); - return await triggerTaskService.call( + const result = await triggerTaskService.call( existingTaskRun.taskIdentifier, authenticatedEnvironment, { @@ -113,6 +113,8 @@ export class ReplayTaskRunService extends BaseService { }, } ); + + return result?.run; } catch (error) { if (error instanceof OutOfEntitlementError) { return; diff --git a/apps/webapp/app/v3/services/resumeBatchRun.server.ts b/apps/webapp/app/v3/services/resumeBatchRun.server.ts index c1d8d6132c..81fac088a6 100644 --- a/apps/webapp/app/v3/services/resumeBatchRun.server.ts +++ b/apps/webapp/app/v3/services/resumeBatchRun.server.ts @@ -3,9 +3,15 @@ import { workerQueue } from "~/services/worker.server"; import { marqs } from "~/v3/marqs/index.server"; import { BaseService } from "./baseService.server"; import { logger } from "~/services/logger.server"; +import { BatchTaskRun } from "@trigger.dev/database"; const finishedBatchRunStatuses = ["COMPLETED", "FAILED", "CANCELED"]; +type RetrieveBatchRunResult = NonNullable>>; + +// {"batchRunId":"cm6l2qfs400d0dyiczcwiuwrp","dependentTaskAttempt":{"status":"PAUSED","id":"cm6l2qcqf00cydyicryir6xlu","taskRun":{"id":"cm6l2qaw200cudyicktkfh4k9","queue":"task/batch-trigger-sequentially","taskIdentifier":"batch-trigger-sequentially","concurrencyKey":null}},"checkpointEventId":"cm6l2qg7400dgdyicy6qx9s8u","timestamp":"2025-01-31T18:04:52.869Z","name":"webapp","message":"ResumeBatchRunService: Attempt is paused and has a checkpoint event","level":"debug","skipForwarding":true} +// {"batchRunId":"cm6l2qfs400d0dyiczcwiuwrp","dependentTaskAttempt":{"status":"PAUSED","id":"cm6l2qcqf00cydyicryir6xlu","taskRun":{"id":"cm6l2qaw200cudyicktkfh4k9","queue":"task/batch-trigger-sequentially","taskIdentifier":"batch-trigger-sequentially","concurrencyKey":null}},"checkpointEventId":"cm6l2qg7400dgdyicy6qx9s8u","hasCheckpointEvent":true,"timestamp":"2025-01-31T18:04:52.871Z","name":"webapp","message":"ResumeBatchRunService: with checkpoint was already completed","level":"debug","skipForwarding":true} + export class ResumeBatchRunService extends BaseService { public async call(batchRunId: string) { const batchRun = await this._prisma.batchTaskRun.findFirst({ @@ -39,6 +45,40 @@ export class ResumeBatchRunService extends BaseService { return "ERROR"; } + if (batchRun.batchVersion === "v3") { + return await this.#handleV3BatchRun(batchRun); + } else { + return await this.#handleLegacyBatchRun(batchRun); + } + } + + async #handleV3BatchRun(batchRun: RetrieveBatchRunResult) { + // V3 batch runs should already be complete by the time this is called + if (batchRun.status !== "COMPLETED") { + logger.debug("ResumeBatchRunService: Batch run is already completed", { + batchRunId: batchRun.id, + batchRun: { + id: batchRun.id, + status: batchRun.status, + }, + }); + + return "ERROR"; + } + + // Even though we are in v3, we still need to check if the batch run has a dependent attempt + if (!batchRun.dependentTaskAttemptId) { + logger.debug("ResumeBatchRunService: Batch run doesn't have a dependent attempt", { + batchRunId: batchRun.id, + }); + + return "ERROR"; + } + + return await this.#handleDependentTaskAttempt(batchRun, batchRun.dependentTaskAttemptId); + } + + async #handleLegacyBatchRun(batchRun: RetrieveBatchRunResult) { if (batchRun.status === "COMPLETED") { logger.debug("ResumeBatchRunService: Batch run is already completed", { batchRunId: batchRun.id, @@ -95,9 +135,16 @@ export class ResumeBatchRunService extends BaseService { return "COMPLETED"; } + return await this.#handleDependentTaskAttempt(batchRun, batchRun.dependentTaskAttemptId); + } + + async #handleDependentTaskAttempt( + batchRun: RetrieveBatchRunResult, + dependentTaskAttemptId: string + ) { const dependentTaskAttempt = await this._prisma.taskRunAttempt.findFirst({ where: { - id: batchRun.dependentTaskAttemptId, + id: dependentTaskAttemptId, }, select: { status: true, @@ -134,7 +181,7 @@ export class ResumeBatchRunService extends BaseService { }); // We need to update the batchRun status so we don't resume it again - const wasUpdated = await this.#setBatchToCompletedOnce(batchRun.id); + const wasUpdated = await this.#setBatchToResumedOnce(batchRun); if (wasUpdated) { logger.debug("ResumeBatchRunService: Resuming dependent run with checkpoint", { @@ -192,7 +239,7 @@ export class ResumeBatchRunService extends BaseService { } // We need to update the batchRun status so we don't resume it again - const wasUpdated = await this.#setBatchToCompletedOnce(batchRun.id); + const wasUpdated = await this.#setBatchToResumedOnce(batchRun); if (wasUpdated) { logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", { @@ -227,10 +274,31 @@ export class ResumeBatchRunService extends BaseService { } } - async #setBatchToCompletedOnce(batchRunId: string) { + async #setBatchToResumedOnce(batchRun: BatchTaskRun) { + // v3 batches don't use the status for deciding whether a batch has been resumed + if (batchRun.batchVersion === "v3") { + const result = await this._prisma.batchTaskRun.updateMany({ + where: { + id: batchRun.id, + resumedAt: null, + }, + data: { + resumedAt: new Date(), + }, + }); + + // Check if any records were updated + if (result.count > 0) { + // The status was changed, so we return true + return true; + } else { + return false; + } + } + const result = await this._prisma.batchTaskRun.updateMany({ where: { - id: batchRunId, + id: batchRun.id, status: { not: "COMPLETED", // Ensure the status is not already "COMPLETED" }, @@ -249,7 +317,12 @@ export class ResumeBatchRunService extends BaseService { } } - static async enqueue(batchRunId: string, tx: PrismaClientOrTransaction, runAt?: Date) { + static async enqueue( + batchRunId: string, + skipJobKey: boolean, + tx: PrismaClientOrTransaction, + runAt?: Date + ) { return await workerQueue.enqueue( "v3.resumeBatchRun", { @@ -258,8 +331,30 @@ export class ResumeBatchRunService extends BaseService { { tx, runAt, - jobKey: `resumeBatchRun-${batchRunId}`, + jobKey: skipJobKey ? undefined : `resumeBatchRun-${batchRunId}`, } ); } } + +async function retrieveBatchRun(id: string, prisma: PrismaClientOrTransaction) { + return await prisma.batchTaskRun.findFirst({ + where: { + id, + }, + include: { + runtimeEnvironment: { + include: { + project: true, + organization: true, + }, + }, + items: { + select: { + status: true, + taskRunAttemptId: true, + }, + }, + }, + }); +} diff --git a/apps/webapp/app/v3/services/resumeDependentParents.server.ts b/apps/webapp/app/v3/services/resumeDependentParents.server.ts index 4155b7b3c0..d019f1db1a 100644 --- a/apps/webapp/app/v3/services/resumeDependentParents.server.ts +++ b/apps/webapp/app/v3/services/resumeDependentParents.server.ts @@ -5,6 +5,7 @@ import { BaseService } from "./baseService.server"; import { ResumeBatchRunService } from "./resumeBatchRun.server"; import { ResumeTaskDependencyService } from "./resumeTaskDependency.server"; import { $transaction } from "~/db.server"; +import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server"; type Output = | { @@ -45,6 +46,7 @@ const taskRunDependencySelect = { dependentBatchRun: { select: { id: true, + batchVersion: true, }, }, }, @@ -242,22 +244,48 @@ export class ResumeDependentParentsService extends BaseService { } ); - await $transaction(this._prisma, async (tx) => { - await tx.batchTaskRunItem.update({ + if (dependency.dependentBatchRun!.batchVersion === "v3") { + const batchTaskRunItem = await this._prisma.batchTaskRunItem.findFirst({ where: { - batchTaskRunId_taskRunId: { - batchTaskRunId: dependency.dependentBatchRun!.id, - taskRunId: dependency.taskRunId, - }, - }, - data: { - status: "COMPLETED", - taskRunAttemptId: lastAttempt.id, + batchTaskRunId: dependency.dependentBatchRun!.id, + taskRunId: dependency.taskRunId, }, }); - await ResumeBatchRunService.enqueue(dependency.dependentBatchRun!.id, tx); - }); + if (batchTaskRunItem) { + await completeBatchTaskRunItemV3( + batchTaskRunItem.id, + batchTaskRunItem.batchTaskRunId, + this._prisma, + true + ); + } else { + logger.debug( + "ResumeDependentParentsService.batchRunDependency() v3: batchTaskRunItem not found", + { + dependency, + lastAttempt, + } + ); + } + } else { + await $transaction(this._prisma, async (tx) => { + await tx.batchTaskRunItem.update({ + where: { + batchTaskRunId_taskRunId: { + batchTaskRunId: dependency.dependentBatchRun!.id, + taskRunId: dependency.taskRunId, + }, + }, + data: { + status: "COMPLETED", + taskRunAttemptId: lastAttempt.id, + }, + }); + + await ResumeBatchRunService.enqueue(dependency.dependentBatchRun!.id, false, tx); + }); + } return { success: true, diff --git a/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts b/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts index 5f721ea437..59f7964e48 100644 --- a/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts +++ b/apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts @@ -1,18 +1,28 @@ -import { BatchTaskRunItem, TaskRunAttempt, TaskRunDependency } from "@trigger.dev/database"; +import { + BatchTaskRun, + BatchTaskRunItem, + TaskRunAttempt, + TaskRunDependency, +} from "@trigger.dev/database"; import { $transaction, PrismaClientOrTransaction } from "~/db.server"; import { workerQueue } from "~/services/worker.server"; import { BaseService } from "./baseService.server"; import { ResumeBatchRunService } from "./resumeBatchRun.server"; import { ResumeTaskDependencyService } from "./resumeTaskDependency.server"; +import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server"; export class ResumeTaskRunDependenciesService extends BaseService { public async call(attemptId: string) { - const taskAttempt = await this._prisma.taskRunAttempt.findUnique({ + const taskAttempt = await this._prisma.taskRunAttempt.findFirst({ where: { id: attemptId }, include: { taskRun: { include: { - batchItems: true, + batchItems: { + include: { + batchTaskRun: true, + }, + }, dependency: { include: { dependentAttempt: true, @@ -42,7 +52,7 @@ export class ResumeTaskRunDependenciesService extends BaseService { if (batchItems.length) { for (const batchItem of batchItems) { - await this.#resumeBatchItem(batchItem, taskAttempt); + await this.#resumeBatchItem(batchItem, batchItem.batchTaskRun, taskAttempt); } return; } @@ -53,20 +63,28 @@ export class ResumeTaskRunDependenciesService extends BaseService { } } - async #resumeBatchItem(batchItem: BatchTaskRunItem, taskAttempt: TaskRunAttempt) { - await $transaction(this._prisma, async (tx) => { - await tx.batchTaskRunItem.update({ - where: { - id: batchItem.id, - }, - data: { - status: "COMPLETED", - taskRunAttemptId: taskAttempt.id, - }, - }); + async #resumeBatchItem( + batchItem: BatchTaskRunItem, + batchTaskRun: BatchTaskRun, + taskAttempt: TaskRunAttempt + ) { + if (batchTaskRun.batchVersion === "v3") { + await completeBatchTaskRunItemV3(batchItem.id, batchTaskRun.id, this._prisma, true); + } else { + await $transaction(this._prisma, async (tx) => { + await tx.batchTaskRunItem.update({ + where: { + id: batchItem.id, + }, + data: { + status: "COMPLETED", + taskRunAttemptId: taskAttempt.id, + }, + }); - await ResumeBatchRunService.enqueue(batchItem.batchTaskRunId, tx); - }); + await ResumeBatchRunService.enqueue(batchItem.batchTaskRunId, false, tx); + }); + } } async #resumeDependency(dependency: TaskRunDependency, taskAttempt: TaskRunAttempt) { diff --git a/apps/webapp/app/v3/services/testTask.server.ts b/apps/webapp/app/v3/services/testTask.server.ts index cf137cf32b..94270f6d06 100644 --- a/apps/webapp/app/v3/services/testTask.server.ts +++ b/apps/webapp/app/v3/services/testTask.server.ts @@ -15,13 +15,19 @@ export class TestTaskService extends BaseService { switch (data.triggerSource) { case "STANDARD": - return await triggerTaskService.call(data.taskIdentifier, authenticatedEnvironment, { - payload: data.payload, - options: { - test: true, - metadata: data.metadata, - }, - }); + const result = await triggerTaskService.call( + data.taskIdentifier, + authenticatedEnvironment, + { + payload: data.payload, + options: { + test: true, + metadata: data.metadata, + }, + } + ); + + return result?.run; case "SCHEDULED": { const payload = { scheduleId: "sched_1234", @@ -34,7 +40,7 @@ export class TestTaskService extends BaseService { }; const payloadPacket = await stringifyIO(payload); - return await triggerTaskService.call( + const result = await triggerTaskService.call( data.taskIdentifier, authenticatedEnvironment, { @@ -43,6 +49,8 @@ export class TestTaskService extends BaseService { }, { customIcon: "scheduled" } ); + + return result?.run; } default: throw new Error("Invalid trigger source"); diff --git a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts b/apps/webapp/app/v3/services/triggerScheduledTask.server.ts index abc61bf33b..552043115e 100644 --- a/apps/webapp/app/v3/services/triggerScheduledTask.server.ts +++ b/apps/webapp/app/v3/services/triggerScheduledTask.server.ts @@ -131,14 +131,14 @@ export class TriggerScheduledTaskService extends BaseService { payloadPacket, }); - const run = await triggerTask.call( + const result = await triggerTask.call( instance.taskSchedule.taskIdentifier, instance.environment, { payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } }, { customIcon: "scheduled" } ); - if (!run) { + if (!result) { logger.error("Failed to trigger task", { instanceId, scheduleId: instance.taskSchedule.friendlyId, @@ -147,7 +147,7 @@ export class TriggerScheduledTaskService extends BaseService { } else { await this._prisma.taskRun.update({ where: { - id: run.id, + id: result.run.id, }, data: { scheduleId: instance.taskSchedule.id, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 5c0008169a..264a81827e 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -26,7 +26,7 @@ import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; import { clampMaxDuration } from "../utils/maxDuration"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; -import { Prisma } from "@trigger.dev/database"; +import { Prisma, TaskRun } from "@trigger.dev/database"; import { sanitizeQueueName } from "~/models/taskQueue.server"; export type TriggerTaskServiceOptions = { @@ -49,15 +49,30 @@ export class OutOfEntitlementError extends Error { } } +export type TriggerTaskServiceResult = { + run: TaskRun; + isCached: boolean; +}; + +const MAX_ATTEMPTS = 2; + export class TriggerTaskService extends BaseService { public async call( taskId: string, environment: AuthenticatedEnvironment, body: TriggerTaskRequestBody, - options: TriggerTaskServiceOptions = {} - ) { + options: TriggerTaskServiceOptions = {}, + attempt: number = 0 + ): Promise { return await this.traceWithEnv("call()", environment, async (span) => { span.setAttribute("taskId", taskId); + span.setAttribute("attempt", attempt); + + if (attempt > MAX_ATTEMPTS) { + throw new ServiceValidationError( + `Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.` + ); + } // TODO: Add idempotency key expiring here const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey; @@ -74,13 +89,11 @@ export class TriggerTaskService extends BaseService { : body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined); const existingRun = idempotencyKey - ? await this._prisma.taskRun.findUnique({ + ? await this._prisma.taskRun.findFirst({ where: { - runtimeEnvironmentId_taskIdentifier_idempotencyKey: { - runtimeEnvironmentId: environment.id, - idempotencyKey, - taskIdentifier: taskId, - }, + runtimeEnvironmentId: environment.id, + idempotencyKey, + taskIdentifier: taskId, }, }) : undefined; @@ -103,7 +116,7 @@ export class TriggerTaskService extends BaseService { } else { span.setAttribute("runId", existingRun.friendlyId); - return existingRun; + return { run: existingRun, isCached: true }; } } @@ -572,7 +585,7 @@ export class TriggerTaskService extends BaseService { ); } - return run; + return { run, isCached: false }; } ); } catch (error) { @@ -608,6 +621,23 @@ export class TriggerTaskService extends BaseService { throw new Error( `Failed to trigger ${taskId} as the queue could not be created do to a unique constraint error, please try again.` ); + } else if ( + Array.isArray(target) && + target.length == 3 && + typeof target[0] === "string" && + typeof target[1] === "string" && + typeof target[2] === "string" && + target[0] == "runtimeEnvironmentId" && + target[1] == "taskIdentifier" && + target[2] == "idempotencyKey" + ) { + logger.debug("TriggerTask: Idempotency key violation, retrying...", { + taskId, + environmentId: environment.id, + idempotencyKey, + }); + // We need to retry the task run creation as the idempotency key has been used + return await this.call(taskId, environment, body, options, attempt + 1); } else { throw new ServiceValidationError( `Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.` diff --git a/internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql b/internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql new file mode 100644 index 0000000000..3f50745158 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250131120251_add_batch_trigger_v3_columns/migration.sql @@ -0,0 +1,9 @@ +-- AlterTable +ALTER TABLE "BatchTaskRun" ADD COLUMN "completedAt" TIMESTAMP(3), +ADD COLUMN "completedCount" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "expectedCount" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "sealed" BOOLEAN NOT NULL DEFAULT false, +ADD COLUMN "sealedAt" TIMESTAMP(3); + +-- AlterTable +ALTER TABLE "BatchTaskRunItem" ADD COLUMN "completedAt" TIMESTAMP(3); diff --git a/internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql b/internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql new file mode 100644 index 0000000000..23ea2d5058 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250131145633_add_processing_columns_to_batch_task_run/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "BatchTaskRun" ADD COLUMN "processingJobsCount" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "processingJobsExpectedCount" INTEGER NOT NULL DEFAULT 0; diff --git a/internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql b/internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql new file mode 100644 index 0000000000..42969a77e5 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20250201072700_add_resumed_at_to_batch_task_run/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "BatchTaskRun" ADD COLUMN "resumedAt" TIMESTAMP(3); diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index d8b53632ec..ef36eb7818 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -2166,6 +2166,21 @@ model BatchTaskRun { options Json? batchVersion String @default("v1") + // This is for v3 batches + /// sealed is set to true once no more items can be added to the batch + sealed Boolean @default(false) + sealedAt DateTime? + /// this is the expected number of items in the batch + expectedCount Int @default(0) + /// this is the completed number of items in the batch. once this reaches expectedCount, and the batch is sealed, the batch is considered completed + completedCount Int @default(0) + completedAt DateTime? + resumedAt DateTime? + + /// this is used to be able to "seal" this BatchTaskRun when all of the runs have been triggered asynchronously, and using the "parallel" processing strategy + processingJobsCount Int @default(0) + processingJobsExpectedCount Int @default(0) + /// optional token that can be used to authenticate the task run oneTimeUseToken String? @@ -2202,8 +2217,9 @@ model BatchTaskRunItem { taskRunAttempt TaskRunAttempt? @relation(fields: [taskRunAttemptId], references: [id], onDelete: SetNull, onUpdate: Cascade) taskRunAttemptId String? - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + completedAt DateTime? @@unique([batchTaskRunId, taskRunId]) @@index([taskRunAttemptId], map: "idx_batchtaskrunitem_taskrunattempt") diff --git a/internal-packages/database/src/transaction.ts b/internal-packages/database/src/transaction.ts index 017e222c65..81eaa12bc3 100644 --- a/internal-packages/database/src/transaction.ts +++ b/internal-packages/database/src/transaction.ts @@ -56,3 +56,34 @@ export async function $transaction( throw error; } } + +export function isUniqueConstraintError( + error: unknown, + columns: T +): boolean { + if (!isPrismaKnownError(error)) { + return false; + } + + if (error.code !== "P2002") { + return false; + } + + const target = error.meta?.target; + + if (!Array.isArray(target)) { + return false; + } + + if (target.length !== columns.length) { + return false; + } + + for (let i = 0; i < columns.length; i++) { + if (target[i] !== columns[i]) { + return false; + } + } + + return true; +} diff --git a/references/v3-catalog/src/trigger/batch.ts b/references/v3-catalog/src/trigger/batch.ts index 056589e76a..dd564d971a 100644 --- a/references/v3-catalog/src/trigger/batch.ts +++ b/references/v3-catalog/src/trigger/batch.ts @@ -300,7 +300,7 @@ export const batchV2TestTask = task({ }, run: async ({ triggerSequentially, - largeBatchSize = 21, + largeBatchSize = 20, }: { triggerSequentially?: boolean; largeBatchSize?: number; @@ -688,6 +688,42 @@ export const batchV2TestTask = task({ }, }); +export const batchTriggerSequentiallyTask = task({ + id: "batch-trigger-sequentially", + retry: { + maxAttempts: 1, + }, + run: async ({ + count = 20, + wait = false, + triggerSequentially = true, + }: { + count: number; + wait: boolean; + triggerSequentially: boolean; + }) => { + if (wait) { + return await batchV2TestChild.batchTriggerAndWait( + Array.from({ length: count }, (_, i) => ({ + payload: { foo: `bar${i}` }, + })), + { + triggerSequentially, + } + ); + } else { + return await batchV2TestChild.batchTrigger( + Array.from({ length: count }, (_, i) => ({ + payload: { foo: `bar${i}` }, + })), + { + triggerSequentially, + } + ); + } + }, +}); + export const batchV2TestChild = task({ id: "batch-v2-test-child", queue: { @@ -756,3 +792,41 @@ export const batchAutoIdempotencyKeyChild = task({ return payload; }, }); + +export const batchTriggerIdempotencyKeyTest = task({ + id: "batch-trigger-idempotency-key-test", + retry: { + maxAttempts: 1, + }, + run: async () => { + // Trigger a batch of 2 items with the same idempotency key + await batchV2TestChild.batchTrigger( + [ + { payload: { foo: "bar" }, options: { idempotencyKey: "test-idempotency-key" } }, + { payload: { foo: "baz" }, options: { idempotencyKey: "test-idempotency-key" } }, + ], + { + triggerSequentially: true, + } + ); + + // Now trigger a batch of 21 items, with just the last one having an idempotency key + await batchV2TestChild.batchTrigger( + Array.from({ length: 20 }, (_, i) => ({ + payload: { foo: `bar${i}` }, + options: {}, + })).concat([ + { + payload: { foo: "baz" }, + options: { idempotencyKey: "test-idempotency-key-2" }, + }, + ]), + { + triggerSequentially: true, + } + ); + + // Now while that batch is being processed in the background, lets trigger the same task with that idempotency key + await batchV2TestChild.trigger({ foo: "baz" }, { idempotencyKey: "test-idempotency-key-2" }); + }, +}); diff --git a/references/v3-catalog/src/trigger/checkpoints.ts b/references/v3-catalog/src/trigger/checkpoints.ts index 89964dc61c..58dc4a9c4d 100644 --- a/references/v3-catalog/src/trigger/checkpoints.ts +++ b/references/v3-catalog/src/trigger/checkpoints.ts @@ -1,17 +1,24 @@ -import { logger, queue, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, queue, schedules, task, wait } from "@trigger.dev/sdk/v3"; type Payload = { count?: number; }; +export const checkpointBatchResumerTester = schedules.task({ + id: "checkpoint-batch-resume-tester", + run: async () => { + await checkpointBatchResumer.triggerAndWait({ count: 1 }); + }, +}); + /** Test that checkpoints and resuming works if the checkpoint isn't created before the resume */ export const checkpointBatchResumer = task({ id: "checkpoint-batch-resume", run: async ({ count = 1 }: Payload) => { await noop.batchTriggerAndWait(Array.from({ length: count }, (_, i) => ({}))); logger.info(`Successfully 1/1 resumed after ${count} runs`); - await noop.batchTriggerAndWait(Array.from({ length: count }, (_, i) => ({}))); - logger.info(`Successfully 2/2 resumed after ${count} runs`); + // await noop.batchTriggerAndWait(Array.from({ length: count }, (_, i) => ({}))); + // logger.info(`Successfully 2/2 resumed after ${count} runs`); }, }); diff --git a/references/v3-catalog/src/trigger/queues.ts b/references/v3-catalog/src/trigger/queues.ts index f2f1781322..e84ce25a53 100644 --- a/references/v3-catalog/src/trigger/queues.ts +++ b/references/v3-catalog/src/trigger/queues.ts @@ -1,60 +1,58 @@ -import { logger, runs, task, wait } from "@trigger.dev/sdk/v3"; +import { logger, runs, task, wait, auth } from "@trigger.dev/sdk/v3"; export const queuesController = task({ id: "queues/controller", - run: async ({ - numberOfQueues = 20, - length = 20, - waitSeconds = 3, - }: { - numberOfQueues?: number; - length?: number; - waitSeconds?: number; - }) => { + run: async ( + { + numberOfQueues = 20, + length = 20, + waitSeconds = 3, + }: { + numberOfQueues?: number; + length?: number; + waitSeconds?: number; + }, + { ctx } + ) => { + const publicToken = await auth.createPublicToken({ + scopes: { + read: { + runs: true, + }, + }, + }); + + logger.debug("Public token", { publicToken }); + await Promise.all([ queuesTest.trigger( { waitSeconds }, { - queue: { - name: "controller-3", - concurrencyLimit: 9, - }, + idempotencyKey: ctx.run.id, } ), queuesTest.trigger( { waitSeconds }, { - queue: { - name: "controller-3", - concurrencyLimit: 9, - }, + idempotencyKey: ctx.run.id, } ), queuesTest.trigger( { waitSeconds }, { - queue: { - name: "controller-3", - concurrencyLimit: 9, - }, + idempotencyKey: ctx.run.id, } ), queuesTest.trigger( { waitSeconds }, { - queue: { - name: "controller-3", - concurrencyLimit: 9, - }, + idempotencyKey: ctx.run.id, } ), queuesTest.trigger( { waitSeconds }, { - queue: { - name: "controller-3", - concurrencyLimit: 9, - }, + idempotencyKey: ctx.run.id, } ), ]);