diff --git a/apps/webapp/app/models/taskRunTag.server.ts b/apps/webapp/app/models/taskRunTag.server.ts index f676b99a0c..19014078b3 100644 --- a/apps/webapp/app/models/taskRunTag.server.ts +++ b/apps/webapp/app/models/taskRunTag.server.ts @@ -1,11 +1,15 @@ import { prisma } from "~/db.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; +import { PrismaClientOrTransaction } from "@trigger.dev/database"; export const MAX_TAGS_PER_RUN = 10; -export async function createTag({ tag, projectId }: { tag: string; projectId: string }) { +export async function createTag( + { tag, projectId }: { tag: string; projectId: string }, + prismaClient: PrismaClientOrTransaction = prisma +) { if (tag.trim().length === 0) return; - return prisma.taskRunTag.upsert({ + return prismaClient.taskRunTag.upsert({ where: { projectId_name: { projectId: projectId, @@ -21,6 +25,48 @@ export async function createTag({ tag, projectId }: { tag: string; projectId: st }); } +export type TagRecord = { + id: string; + name: string; +}; + +export async function createTags( + { + tags, + projectId, + }: { + tags: string | string[] | undefined; + projectId: string; + }, + prismaClient: PrismaClientOrTransaction = prisma +): Promise { + if (!tags) { + return []; + } + + const tagsArray = typeof tags === "string" ? [tags] : tags; + + if (tagsArray.length === 0) { + return []; + } + + const tagRecords: TagRecord[] = []; + for (const tag of tagsArray) { + const tagRecord = await createTag( + { + tag, + projectId, + }, + prismaClient + ); + if (tagRecord) { + tagRecords.push({ id: tagRecord.id, name: tagRecord.name }); + } + } + + return tagRecords; +} + export async function getTagsForRunId({ friendlyId, environmentId, diff --git a/apps/webapp/app/routes/api.v2.tasks.batch.ts b/apps/webapp/app/routes/api.v2.tasks.batch.ts index e2c13e1aa9..62732ff7ad 100644 --- a/apps/webapp/app/routes/api.v2.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v2.tasks.batch.ts @@ -10,9 +10,9 @@ import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy } from "~/v3/services/batchTriggerV3.server"; -import { BatchTriggerV4Service } from "~/v3/services/batchTriggerV4.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; +import { RunEngineBatchTriggerService } from "~/runEngine/services/batchTrigger.server"; const { action, loader } = createActionApiRoute( { @@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute( ? { traceparent, tracestate } : undefined; - const service = new BatchTriggerV4Service(batchProcessingStrategy ?? undefined); + const service = new RunEngineBatchTriggerService(batchProcessingStrategy ?? undefined); try { const batch = await service.call(authentication.environment, body, { diff --git a/apps/webapp/app/v3/services/batchTriggerV4.server.ts b/apps/webapp/app/runEngine/services/batchTrigger.server.ts similarity index 98% rename from apps/webapp/app/v3/services/batchTriggerV4.server.ts rename to apps/webapp/app/runEngine/services/batchTrigger.server.ts index 1434a8f915..74f16395e4 100644 --- a/apps/webapp/app/v3/services/batchTriggerV4.server.ts +++ b/apps/webapp/app/runEngine/services/batchTrigger.server.ts @@ -15,10 +15,10 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { getEntitlement } from "~/services/platform.v3.server"; import { workerQueue } from "~/services/worker.server"; -import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../r2.server"; -import { startActiveSpan } from "../tracer.server"; -import { ServiceValidationError, WithRunEngine } from "./baseService.server"; -import { OutOfEntitlementError, TriggerTaskService } from "./triggerTask.server"; +import { downloadPacketFromObjectStore, uploadPacketToObjectStore } from "../../v3/r2.server"; +import { startActiveSpan } from "../../v3/tracer.server"; +import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; +import { OutOfEntitlementError, TriggerTaskService } from "../../v3/services/triggerTask.server"; const PROCESSING_BATCH_SIZE = 50; const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20; @@ -49,7 +49,7 @@ export type BatchTriggerTaskServiceOptions = { /** * Larger batches, used in Run Engine v2 */ -export class BatchTriggerV4Service extends WithRunEngine { +export class RunEngineBatchTriggerService extends WithRunEngine { private _batchProcessingStrategy: BatchProcessingStrategy; constructor( @@ -643,7 +643,7 @@ export class BatchTriggerV4Service extends WithRunEngine { } async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) { - await workerQueue.enqueue("v3.processBatchTaskRunV3", options, { + await workerQueue.enqueue("runengine.processBatchTaskRun", options, { tx, jobKey: `BatchTriggerV3Service.process:${options.batchId}:${options.processingId}`, }); diff --git a/apps/webapp/app/v3/services/triggerTaskV2.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts similarity index 92% rename from apps/webapp/app/v3/services/triggerTaskV2.server.ts rename to apps/webapp/app/runEngine/services/triggerTask.server.ts index d7bc8b9f6a..a010b6632a 100644 --- a/apps/webapp/app/v3/services/triggerTaskV2.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -1,4 +1,4 @@ -import { RunEngine, RunDuplicateIdempotencyKeyError } from "@internal/run-engine"; +import { RunDuplicateIdempotencyKeyError, RunEngine } from "@internal/run-engine"; import { IOPacket, packetRequiresOffloading, @@ -14,9 +14,9 @@ import { sanitizeQueueName, stringifyDuration, } from "@trigger.dev/core/v3/isomorphic"; -import { Prisma, TaskRun } from "@trigger.dev/database"; +import { Prisma } from "@trigger.dev/database"; import { env } from "~/env.server"; -import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; +import { createTags, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; import { logger } from "~/services/logger.server"; @@ -24,24 +24,23 @@ import { getEntitlement } from "~/services/platform.v3.server"; import { parseDelay } from "~/utils/delays"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; import { handleMetadataPacket } from "~/utils/packets"; -import { eventRepository } from "../eventRepository.server"; -import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; -import { uploadPacketToObjectStore } from "../r2.server"; -import { isFinalRunStatus } from "../taskStatus"; -import { startActiveSpan } from "../tracer.server"; -import { clampMaxDuration } from "../utils/maxDuration"; -import { ServiceValidationError, WithRunEngine } from "./baseService.server"; +import { eventRepository } from "../../v3/eventRepository.server"; +import { findCurrentWorkerFromEnvironment } from "../../v3/models/workerDeployment.server"; +import { uploadPacketToObjectStore } from "../../v3/r2.server"; +import { getTaskEventStore } from "../../v3/taskEventStore.server"; +import { isFinalRunStatus } from "../../v3/taskStatus"; +import { startActiveSpan } from "../../v3/tracer.server"; +import { clampMaxDuration } from "../../v3/utils/maxDuration"; +import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server"; import { MAX_ATTEMPTS, OutOfEntitlementError, TriggerTaskServiceOptions, TriggerTaskServiceResult, -} from "./triggerTask.server"; -import { WorkerGroupService } from "./worker/workerGroupService.server"; -import { getTaskEventStore } from "../taskEventStore.server"; +} from "../../v3/services/triggerTask.server"; +import { WorkerGroupService } from "../../v3/services/worker/workerGroupService.server"; -/** @deprecated Use TriggerTaskService in `triggerTask.server.ts` instead. */ -export class TriggerTaskServiceV2 extends WithRunEngine { +export class RunEngineTriggerTaskService extends WithRunEngine { public async call({ taskId, environment, @@ -299,20 +298,13 @@ export class TriggerTaskServiceV2 extends WithRunEngine { span.setAttribute("queueName", queueName); //upsert tags - let tags: { id: string; name: string }[] = []; - const bodyTags = - typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; - if (bodyTags && bodyTags.length > 0) { - for (const tag of bodyTags) { - const tagRecord = await createTag({ - tag, - projectId: environment.projectId, - }); - if (tagRecord) { - tags.push(tagRecord); - } - } - } + const tags = await createTags( + { + tags: body.options?.tags, + projectId: environment.projectId, + }, + this._prisma + ); const depth = parentRun ? parentRun.depth + 1 : 0; @@ -372,6 +364,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine { machine: body.options?.machine, priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined, releaseConcurrency: body.options?.releaseConcurrency, + queueTimestamp: + parentRun && body.options?.resumeParentOnCompletion + ? parentRun.queueTimestamp ?? undefined + : undefined, }, this._prisma ); diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 217d427230..75a7ded41d 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -29,9 +29,9 @@ import { reportInvocationUsage } from "./platform.v3.server"; import { logger } from "./logger.server"; import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server"; import { - BatchProcessingOptions as BatchProcessingOptionsV4, - BatchTriggerV4Service, -} from "~/v3/services/batchTriggerV4.server"; + BatchProcessingOptions as RunEngineBatchProcessingOptions, + RunEngineBatchTriggerService, +} from "~/runEngine/services/batchTrigger.server"; const workerCatalog = { scheduleEmail: DeliverEmailSchema, @@ -99,7 +99,7 @@ const workerCatalog = { }), "v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions, "v3.processBatchTaskRun": BatchProcessingOptions, - "v3.processBatchTaskRunV3": BatchProcessingOptionsV4, + "runengine.processBatchTaskRun": RunEngineBatchProcessingOptions, }; let workerQueue: ZodWorker; @@ -341,11 +341,11 @@ function getWorkerQueue() { await service.processBatchTaskRun(payload); }, }, - "v3.processBatchTaskRunV3": { + "runengine.processBatchTaskRun": { priority: 0, maxAttempts: 5, handler: async (payload, job) => { - const service = new BatchTriggerV4Service(payload.strategy); + const service = new RunEngineBatchTriggerService(payload.strategy); await service.processBatchTaskRun(payload); }, diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index b7aef6450f..52560ec3d5 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -4,7 +4,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { determineEngineVersion } from "../engineVersion.server"; import { WithRunEngine } from "./baseService.server"; import { TriggerTaskServiceV1 } from "./triggerTaskV1.server"; -import { TriggerTaskServiceV2 } from "./triggerTaskV2.server"; +import { RunEngineTriggerTaskService } from "~/runEngine/services/triggerTask.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -78,7 +78,7 @@ export class TriggerTaskService extends WithRunEngine { body: TriggerTaskRequestBody, options: TriggerTaskServiceOptions = {} ): Promise { - const service = new TriggerTaskServiceV2({ + const service = new RunEngineTriggerTaskService({ prisma: this._prisma, engine: this._engine, }); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 80fa8f4607..dcd71db84d 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -334,6 +334,7 @@ export class RunEngine { maxAttempts, taskEventStore, priorityMs, + queueTimestamp, ttl, tags, parentTaskRunId, @@ -414,6 +415,7 @@ export class RunEngine { maxAttempts, taskEventStore, priorityMs, + queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(), ttl, tags: tags.length === 0 @@ -520,7 +522,6 @@ export class RunEngine { await this.enqueueSystem.enqueueRun({ run: taskRun, env: environment, - timestamp: Date.now() - taskRun.priorityMs, workerId, runnerId, tx: prisma, diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index de06fca524..71f54b988e 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -161,7 +161,6 @@ export class CheckpointSystem { const newSnapshot = await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { status: "QUEUED", description: diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index c954a8d7e1..6c7e410bac 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -98,7 +98,6 @@ export class DelayedRunSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, batchId: run.batchId ?? undefined, }); diff --git a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts index 0ed309792e..a702e516f3 100644 --- a/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/enqueueSystem.ts @@ -25,7 +25,6 @@ export class EnqueueSystem { public async enqueueRun({ run, env, - timestamp, tx, snapshot, previousSnapshotId, @@ -37,7 +36,6 @@ export class EnqueueSystem { }: { run: TaskRun; env: MinimalAuthenticatedEnvironment; - timestamp: number; tx?: PrismaClientOrTransaction; snapshot?: { status?: Extract; @@ -81,6 +79,8 @@ export class EnqueueSystem { masterQueues.push(run.secondaryMasterQueue); } + const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs; + await this.$.runQueue.enqueueMessage({ env, masterQueues, diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index 991d8b9aa0..8be87cca7e 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -97,9 +97,6 @@ export class PendingVersionSystem { await this.enqueueSystem.enqueueRun({ run: updatedRun, env: backgroundWorker.runtimeEnvironment, - //add to the queue using the original run created time - //this should ensure they're in the correct order in the queue - timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs, tx, }); }); diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index ee5d79895d..8d4fe32eab 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -540,7 +540,6 @@ export class WaitpointSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { status: "QUEUED_EXECUTING", description: "Run can continue, but is waiting for concurrency", @@ -564,7 +563,6 @@ export class WaitpointSystem { await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, - timestamp: run.createdAt.getTime() - run.priorityMs, snapshot: { description: "Run was QUEUED, because all waitpoints are completed", }, diff --git a/internal-packages/run-engine/src/engine/tests/priority.test.ts b/internal-packages/run-engine/src/engine/tests/priority.test.ts index 779238d0cf..2467449585 100644 --- a/internal-packages/run-engine/src/engine/tests/priority.test.ts +++ b/internal-packages/run-engine/src/engine/tests/priority.test.ts @@ -10,130 +10,232 @@ import { setupAuthenticatedEnvironment, setupBackgroundWorker } from "./setup.js vi.setConfig({ testTimeout: 60_000 }); describe("RunEngine priority", () => { - containerTest("Two runs execute in the correct order", async ({ prisma, redisOptions }) => { - //create environment - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - - const engine = new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", + containerTest( + "runs execute in priority order based on priorityMs", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, }, + baseCostInCents: 0.0005, }, - baseCostInCents: 0.0005, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - try { - const taskIdentifier = "test-task"; - - //create background worker - const backgroundWorker = await setupBackgroundWorker( - engine, - authenticatedEnvironment, - taskIdentifier - ); - - //the order should be 4,3,1,0,2 - // 0 1 2 3 4 - const priorities = [undefined, 500, -1200, 1000, 4000]; - - //trigger the runs - const runs = await triggerRuns({ - engine, - environment: authenticatedEnvironment, - taskIdentifier, - prisma, - priorities, + tracer: trace.getTracer("test", "0.0.0"), }); - expect(runs.length).toBe(priorities.length); - //check the queue length - const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); - expect(queueLength).toBe(priorities.length); + try { + const taskIdentifier = "test-task"; - //dequeue (expect 4 items because of the negative priority) - const dequeue = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, - }); - expect(dequeue.length).toBe(4); - expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId); - expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); - expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); - expect(dequeue[3].run.friendlyId).toBe(runs[0].friendlyId); - - //wait 2 seconds (because of the negative priority) - await setTimeout(2_000); - const dequeue2 = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: "main", - maxRunCount: 20, + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //the order should be 4,3,1,0,2 + // 0 1 2 3 4 + const priorities = [undefined, 500, -1200, 1000, 4000]; + + //trigger the runs + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + runs: priorities.map((priority, index) => ({ + number: index, + priorityMs: priority, + })), + }); + expect(runs.length).toBe(priorities.length); + + //check the queue length + const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength).toBe(priorities.length); + + //dequeue (expect 4 items because of the negative priority) + const dequeue = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue.length).toBe(4); + expect(dequeue[0].run.friendlyId).toBe(runs[4].friendlyId); + expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); + expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); + expect(dequeue[3].run.friendlyId).toBe(runs[0].friendlyId); + + //wait 2 seconds (because of the negative priority) + await setTimeout(2_000); + const dequeue2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue2.length).toBe(1); + expect(dequeue2[0].run.friendlyId).toBe(runs[2].friendlyId); + } finally { + engine.quit(); + } + } + ); + + containerTest( + "runs execute in order of their queueTimestamp", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), }); - expect(dequeue2.length).toBe(1); - expect(dequeue2[0].run.friendlyId).toBe(runs[2].friendlyId); - } finally { - engine.quit(); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + engine, + authenticatedEnvironment, + taskIdentifier + ); + + //the order should be 2, 3, 1, 4, 0 + const queueTimestamps = [ + undefined, + new Date(3000), + new Date(1000), + new Date(2000), + new Date(4000), + ]; + + //trigger the runs + const runs = await triggerRuns({ + engine, + environment: authenticatedEnvironment, + taskIdentifier, + prisma, + runs: queueTimestamps.map((queueTimestamp, index) => ({ + number: index, + queueTimestamp, + })), + }); + expect(runs.length).toBe(queueTimestamps.length); + + //check the queue length + const queueLength = await engine.runQueue.lengthOfEnvQueue(authenticatedEnvironment); + expect(queueLength).toBe(queueTimestamps.length); + + //dequeue (expect 4 items because of the negative priority) + const dequeue = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: "main", + maxRunCount: 20, + }); + expect(dequeue.length).toBe(queueTimestamps.length); + expect(dequeue[0].run.friendlyId).toBe(runs[2].friendlyId); + expect(dequeue[1].run.friendlyId).toBe(runs[3].friendlyId); + expect(dequeue[2].run.friendlyId).toBe(runs[1].friendlyId); + expect(dequeue[3].run.friendlyId).toBe(runs[4].friendlyId); + expect(dequeue[4].run.friendlyId).toBe(runs[0].friendlyId); + } finally { + engine.quit(); + } } - }); + ); }); async function triggerRuns({ engine, environment, taskIdentifier, - priorities, + runs, prisma, }: { engine: RunEngine; environment: MinimalAuthenticatedEnvironment; taskIdentifier: string; prisma: PrismaClientOrTransaction; - priorities: (number | undefined)[]; + runs: { + number: number; + priorityMs?: number; + queueTimestamp?: Date; + }[]; }) { - const runs = []; - for (let i = 0; i < priorities.length; i++) { - runs[i] = await engine.trigger( - { - number: i, - friendlyId: generateFriendlyId("run"), - environment, - taskIdentifier, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - masterQueue: "main", - queue: `task/${taskIdentifier}`, - isTest: false, - tags: [], - priorityMs: priorities[i], - }, - prisma + const triggeredRuns = []; + for (const run of runs) { + triggeredRuns.push( + await engine.trigger( + { + number: run.number, + friendlyId: generateFriendlyId("run"), + environment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queue: `task/${taskIdentifier}`, + isTest: false, + tags: [], + priorityMs: run.priorityMs, + queueTimestamp: run.queueTimestamp, + }, + prisma + ) ); } - return runs; + return triggeredRuns; } diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index c4ad4942c3..abf520573f 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -87,6 +87,7 @@ export type TriggerParams = { maxAttempts?: number; taskEventStore?: string; priorityMs?: number; + queueTimestamp?: Date; ttl?: string; tags: { id: string; name: string }[]; parentTaskRunId?: string; diff --git a/references/hello-world/src/trigger/tags.ts b/references/hello-world/src/trigger/tags.ts new file mode 100644 index 0000000000..c7b97c97cf --- /dev/null +++ b/references/hello-world/src/trigger/tags.ts @@ -0,0 +1,22 @@ +import { logger, task, wait } from "@trigger.dev/sdk"; + +export const tagsTester = task({ + id: "tags-tester", + run: async (payload: any, { ctx }) => { + await tagsChildTask.trigger( + { + tags: ["tag1", "tag2"], + }, + { + tags: ["user:user1", "org:org1"], + } + ); + }, +}); + +export const tagsChildTask = task({ + id: "tags-child", + run: async (payload: any, { ctx }) => { + logger.log("Hello, world from the child", { payload }); + }, +}); diff --git a/references/test-tasks/src/trigger/helpers.ts b/references/test-tasks/src/trigger/helpers.ts index 8fcffb3f29..ed47c198a1 100644 --- a/references/test-tasks/src/trigger/helpers.ts +++ b/references/test-tasks/src/trigger/helpers.ts @@ -76,7 +76,7 @@ export const retryTask = task({ throw new Error("Error"); } }, - handleError: async (payload, error, { ctx }) => { + handleError: async ({ ctx, payload, error }) => { if (!payload.throwError) { return { skipRetrying: true,