From 56d69f9bd044a9f476310c7141e9ea534a60fb00 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 5 Feb 2025 16:13:40 +0000 Subject: [PATCH 1/6] Move the task run heartbeats to RedisWorker --- apps/webapp/app/env.server.ts | 36 +++ apps/webapp/app/services/worker.server.ts | 7 +- .../app/v3/legacyRunEngineWorker.server.ts | 68 +++++ apps/webapp/app/v3/marqs/index.server.ts | 4 +- .../v3/marqs/v3VisibilityTimeout.server.ts | 24 +- ...er.ts => taskRunHeartbeatFailed.server.ts} | 2 +- apps/webapp/package.json | 1 + apps/webapp/tsconfig.json | 4 +- internal-packages/redis-worker/package.json | 1 - internal-packages/redis-worker/src/index.ts | 2 + .../redis-worker/src/queue.test.ts | 170 ++++++----- internal-packages/redis-worker/src/queue.ts | 18 +- .../redis-worker/src/telemetry.ts | 31 ++ .../redis-worker/src/worker.test.ts | 7 - internal-packages/redis-worker/src/worker.ts | 277 ++++++++++++------ internal-packages/redis-worker/tsconfig.json | 2 +- pnpm-lock.yaml | 6 +- references/v3-catalog/src/trigger/simple.ts | 2 + 18 files changed, 467 insertions(+), 195 deletions(-) create mode 100644 apps/webapp/app/v3/legacyRunEngineWorker.server.ts rename apps/webapp/app/v3/{requeueTaskRun.server.ts => taskRunHeartbeatFailed.server.ts} (98%) create mode 100644 internal-packages/redis-worker/src/index.ts create mode 100644 internal-packages/redis-worker/src/telemetry.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 3105c36d04..425df271de 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -368,6 +368,42 @@ const EnvironmentSchema = z.object({ BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"), BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"), + + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), + LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + + LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + LEGACY_RUN_ENGINE_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + LEGACY_RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + LEGACY_RUN_ENGINE_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z + .string() + .default(process.env.REDIS_TLS_DISABLED ?? "false"), + LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 6c86b37f47..51f02ecc42 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -6,7 +6,6 @@ import { $replica, prisma } from "~/db.server"; import { env } from "~/env.server"; import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server"; import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server"; -import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server"; import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server"; import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server"; @@ -658,11 +657,7 @@ function getWorkerQueue() { "v3.requeueTaskRun": { priority: 0, maxAttempts: 3, - handler: async (payload, job) => { - const service = new RequeueTaskRunService(); - - await service.call(payload.runId); - }, + handler: async (payload, job) => {}, // This is now handled by redisWorker }, "v3.retryAttempt": { priority: 0, diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts new file mode 100644 index 0000000000..d8bf68f728 --- /dev/null +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -0,0 +1,68 @@ +import { Worker as RedisWorker } from "@internal/redis-worker"; +import { Logger } from "@trigger.dev/core/logger"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; +import { tracer } from "./tracer.server"; + +const workerCatalog = { + runHeartbeat: { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 10000, + }, +}; + +function initializeWorker() { + if (env.WORKER_ENABLED !== "true") { + logger.debug("RedisWorker not initialized because WORKER_ENABLED is not set to true"); + return; + } + + if (!env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST || !env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT) { + logger.debug( + "RedisWorker not initialized because LEGACY_RUN_ENGINE_WORKER_REDIS_HOST or LEGACY_RUN_ENGINE_WORKER_REDIS_PORT is not set" + ); + return; + } + + const redisOptions = { + keyPrefix: "legacy-run-engine:worker:", + host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST, + port: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT, + username: env.LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME, + password: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug( + `👨‍🏭 Initializing legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}` + ); + + const worker = new RedisWorker({ + name: "legacy-run-engine-worker", + redisOptions, + catalog: workerCatalog, + concurrency: { + workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER, + }, + pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL, + logger: new Logger("LegacyRunEngineWorker", "debug"), + jobs: { + runHeartbeat: async ({ payload }) => { + const service = new TaskRunHeartbeatFailedService(); + + await service.call(payload.runId); + }, + }, + }); + + return worker; +} + +export const legacyRunEngineWorker = singleton("legacyRunEngineWorker", initializeWorker); diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index a6d60bab8e..9ec891b7e3 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -30,7 +30,7 @@ import { MessageQueueSubscriber, VisibilityTimeoutStrategy, } from "./types"; -import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server"; +import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server"; const KEY_PREFIX = "marqs:"; @@ -1611,7 +1611,7 @@ function getMarQSClient() { name: "marqs", tracer: trace.getTracer("marqs"), keysProducer, - visibilityTimeoutStrategy: new V3VisibilityTimeout(), + visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(), queuePriorityStrategy: new FairDequeuingStrategy({ tracer: tracer, redis, diff --git a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts index 88494d0159..92b1d4b1cd 100644 --- a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts +++ b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts @@ -1,12 +1,28 @@ -import { RequeueTaskRunService } from "../requeueTaskRun.server"; +import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server"; +import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server"; import { VisibilityTimeoutStrategy } from "./types"; -export class V3VisibilityTimeout implements VisibilityTimeoutStrategy { +export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy { async heartbeat(messageId: string, timeoutInMs: number): Promise { - await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); + await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs)); } async cancelHeartbeat(messageId: string): Promise { - await RequeueTaskRunService.dequeue(messageId); + await TaskRunHeartbeatFailedService.dequeue(messageId); + } +} + +export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy { + async heartbeat(messageId: string, timeoutInMs: number): Promise { + await legacyRunEngineWorker?.enqueue({ + id: `heartbeat:${messageId}`, + job: "runHeartbeat", + payload: { runId: messageId }, + availableAt: new Date(Date.now() + timeoutInMs), + }); + } + + async cancelHeartbeat(messageId: string): Promise { + await legacyRunEngineWorker?.ack(`heartbeat:${messageId}`); } } diff --git a/apps/webapp/app/v3/requeueTaskRun.server.ts b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts similarity index 98% rename from apps/webapp/app/v3/requeueTaskRun.server.ts rename to apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts index 5e7732ed35..d722a6f80b 100644 --- a/apps/webapp/app/v3/requeueTaskRun.server.ts +++ b/apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts @@ -9,7 +9,7 @@ import { workerQueue } from "~/services/worker.server"; import { socketIo } from "./handleSocketIo.server"; import { TaskRunErrorCodes } from "@trigger.dev/core/v3"; -export class RequeueTaskRunService extends BaseService { +export class TaskRunHeartbeatFailedService extends BaseService { public async call(runId: string) { const taskRun = await this._prisma.taskRun.findFirst({ where: { diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 96970164c1..e6f11259f9 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -51,6 +51,7 @@ "@headlessui/react": "^1.7.8", "@heroicons/react": "^2.0.12", "@internal/zod-worker": "workspace:*", + "@internal/redis-worker": "workspace:*", "@internationalized/date": "^3.5.1", "@lezer/highlight": "^1.1.6", "@opentelemetry/api": "1.9.0", diff --git a/apps/webapp/tsconfig.json b/apps/webapp/tsconfig.json index af3d25eb48..af02ef016b 100644 --- a/apps/webapp/tsconfig.json +++ b/apps/webapp/tsconfig.json @@ -34,7 +34,9 @@ "emails": ["../../internal-packages/emails/src/index"], "emails/*": ["../../internal-packages/emails/src/*"], "@internal/zod-worker": ["../../internal-packages/zod-worker/src/index"], - "@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"] + "@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"], + "@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"], + "@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"] }, "noEmit": true } diff --git a/internal-packages/redis-worker/package.json b/internal-packages/redis-worker/package.json index 33e7bbea42..bf44ab71cb 100644 --- a/internal-packages/redis-worker/package.json +++ b/internal-packages/redis-worker/package.json @@ -11,7 +11,6 @@ "ioredis": "^5.3.2", "lodash.omit": "^4.5.0", "nanoid": "^5.0.7", - "typescript": "^5.5.4", "zod": "3.23.8" }, "devDependencies": { diff --git a/internal-packages/redis-worker/src/index.ts b/internal-packages/redis-worker/src/index.ts new file mode 100644 index 0000000000..a5893efc83 --- /dev/null +++ b/internal-packages/redis-worker/src/index.ts @@ -0,0 +1,2 @@ +export * from "./queue"; +export * from "./worker"; diff --git a/internal-packages/redis-worker/src/queue.test.ts b/internal-packages/redis-worker/src/queue.test.ts index 075961eba5..023a9564a8 100644 --- a/internal-packages/redis-worker/src/queue.test.ts +++ b/internal-packages/redis-worker/src/queue.test.ts @@ -30,13 +30,16 @@ describe("SimpleQueue", () => { expect(await queue.size()).toBe(2); const [first] = await queue.dequeue(1); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); expect(await queue.size()).toBe(1); expect(await queue.size({ includeFuture: true })).toBe(2); @@ -44,13 +47,16 @@ describe("SimpleQueue", () => { expect(await queue.size({ includeFuture: true })).toBe(1); const [second] = await queue.dequeue(1); - expect(second).toEqual({ - id: "2", - job: "test", - item: { value: 2 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(second).toEqual( + expect.objectContaining({ + id: "2", + job: "test", + item: { value: 2 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); await queue.ack(second.id); expect(await queue.size({ includeFuture: true })).toBe(0); @@ -81,13 +87,16 @@ describe("SimpleQueue", () => { await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 }); const [hitOne] = await queue.dequeue(1); - expect(hitOne).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(hitOne).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); const missTwo = await queue.dequeue(1); expect(missTwo).toEqual([]); @@ -128,13 +137,16 @@ describe("SimpleQueue", () => { await new Promise((resolve) => setTimeout(resolve, 50)); const [first] = await queue.dequeue(); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); } finally { await queue.close(); } @@ -160,13 +172,16 @@ describe("SimpleQueue", () => { await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 1_000 }); const [first] = await queue.dequeue(); - expect(first).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 1_000, - attempt: 0, - }); + expect(first).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 1_000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); const missImmediate = await queue.dequeue(1); expect(missImmediate).toEqual([]); @@ -174,13 +189,16 @@ describe("SimpleQueue", () => { await new Promise((resolve) => setTimeout(resolve, 1_000)); const [second] = await queue.dequeue(); - expect(second).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 1_000, - attempt: 0, - }); + expect(second).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 1_000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); } finally { await queue.close(); } @@ -211,20 +229,26 @@ describe("SimpleQueue", () => { const dequeued = await queue.dequeue(2); expect(dequeued).toHaveLength(2); - expect(dequeued[0]).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); - expect(dequeued[1]).toEqual({ - id: "2", - job: "test", - item: { value: 2 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(dequeued[0]).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + expect(dequeued[1]).toEqual( + expect.objectContaining({ + id: "2", + job: "test", + item: { value: 2 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); expect(await queue.size()).toBe(1); expect(await queue.size({ includeFuture: true })).toBe(3); @@ -235,13 +259,16 @@ describe("SimpleQueue", () => { expect(await queue.size({ includeFuture: true })).toBe(1); const [last] = await queue.dequeue(1); - expect(last).toEqual({ - id: "3", - job: "test", - item: { value: 3 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(last).toEqual( + expect.objectContaining({ + id: "3", + job: "test", + item: { value: 3 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); await queue.ack(last.id); expect(await queue.size({ includeFuture: true })).toBe(0); @@ -288,13 +315,16 @@ describe("SimpleQueue", () => { // Dequeue the redriven item const [redrivenItem] = await queue.dequeue(1); - expect(redrivenItem).toEqual({ - id: "1", - job: "test", - item: { value: 1 }, - visibilityTimeoutMs: 2000, - attempt: 0, - }); + expect(redrivenItem).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); // Acknowledge the item await queue.ack(redrivenItem.id); diff --git a/internal-packages/redis-worker/src/queue.ts b/internal-packages/redis-worker/src/queue.ts index 04c08a30d2..8a290d4c28 100644 --- a/internal-packages/redis-worker/src/queue.ts +++ b/internal-packages/redis-worker/src/queue.ts @@ -33,7 +33,7 @@ export class SimpleQueue { this.name = name; this.redis = new Redis({ ...redisOptions, - keyPrefix: `{queue:${name}:}`, + keyPrefix: `${redisOptions.keyPrefix ?? ""}{queue:${name}:}`, retryStrategy(times) { const delay = Math.min(times * 50, 1000); return delay; @@ -114,6 +114,7 @@ export class SimpleQueue { item: MessageCatalogValue>; visibilityTimeoutMs: number; attempt: number; + timestamp: Date; }> > { const now = Date.now(); @@ -127,13 +128,15 @@ export class SimpleQueue { const dequeuedItems = []; - for (const [id, serializedItem] of results) { - const parsedItem = JSON.parse(serializedItem); + for (const [id, serializedItem, score] of results) { + const parsedItem = JSON.parse(serializedItem) as any; if (typeof parsedItem.job !== "string") { this.logger.error(`Invalid item in queue`, { queue: this.name, id, item: parsedItem }); continue; } + const timestamp = new Date(Number(score)); + const schema = this.schema[parsedItem.job]; if (!schema) { @@ -142,6 +145,7 @@ export class SimpleQueue { id, item: parsedItem, job: parsedItem.job, + timestamp, }); continue; } @@ -155,6 +159,7 @@ export class SimpleQueue { item: parsedItem, errors: validatedItem.error, attempt: parsedItem.attempt, + timestamp, }); continue; } @@ -170,6 +175,7 @@ export class SimpleQueue { item: validatedItem.data, visibilityTimeoutMs, attempt: parsedItem.attempt ?? 0, + timestamp, }); } @@ -336,7 +342,7 @@ export class SimpleQueue { local invisibleUntil = now + visibilityTimeoutMs redis.call('ZADD', queue, invisibleUntil, id) - table.insert(dequeued, {id, serializedItem}) + table.insert(dequeued, {id, serializedItem, score}) end end @@ -435,8 +441,8 @@ declare module "ioredis" { //args now: number, count: number, - callback?: Callback> - ): Result, Context>; + callback?: Callback> + ): Result, Context>; ackItem( queue: string, diff --git a/internal-packages/redis-worker/src/telemetry.ts b/internal-packages/redis-worker/src/telemetry.ts new file mode 100644 index 0000000000..d52c437204 --- /dev/null +++ b/internal-packages/redis-worker/src/telemetry.ts @@ -0,0 +1,31 @@ +import { SpanOptions, SpanStatusCode, Span, Tracer } from "@opentelemetry/api"; + +export async function startSpan( + tracer: Tracer, + name: string, + fn: (span: Span) => Promise, + options?: SpanOptions +): Promise { + return tracer.startActiveSpan(name, options ?? {}, async (span) => { + try { + return await fn(span); + } catch (error) { + if (error instanceof Error) { + span.recordException(error); + } else if (typeof error === "string") { + span.recordException(new Error(error)); + } else { + span.recordException(new Error(String(error))); + } + + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + + throw error; + } finally { + span.end(); + } + }); +} diff --git a/internal-packages/redis-worker/src/worker.test.ts b/internal-packages/redis-worker/src/worker.test.ts index a55a653887..de2e78a7b0 100644 --- a/internal-packages/redis-worker/src/worker.test.ts +++ b/internal-packages/redis-worker/src/worker.test.ts @@ -274,11 +274,4 @@ describe("Worker", () => { } } ); - - //todo test that throwing an error doesn't screw up the other items - //todo process more items when finished - - //todo add a Dead Letter Queue when items are failed, with the error - //todo add a function on the worker to redrive them - //todo add an API endpoint to redrive with an ID }); diff --git a/internal-packages/redis-worker/src/worker.ts b/internal-packages/redis-worker/src/worker.ts index 601f5e1708..7084b67a42 100644 --- a/internal-packages/redis-worker/src/worker.ts +++ b/internal-packages/redis-worker/src/worker.ts @@ -1,3 +1,4 @@ +import { SpanKind, trace, Tracer } from "@opentelemetry/api"; import { Logger } from "@trigger.dev/core/logger"; import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; import { calculateNextRetryDelay } from "@trigger.dev/core/v3"; @@ -8,12 +9,13 @@ import { z } from "zod"; import { SimpleQueue } from "./queue.js"; import Redis from "ioredis"; +import { startSpan } from "./telemetry.js"; -type WorkerCatalog = { +export type WorkerCatalog = { [key: string]: { schema: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion; visibilityTimeoutMs: number; - retry: RetryOptions; + retry?: RetryOptions; }; }; @@ -28,6 +30,11 @@ type JobHandler = (param attempt: number; }) => Promise; +export type WorkerConcurrencyOptions = { + workers?: number; + tasksPerWorker?: number; +}; + type WorkerOptions = { name: string; redisOptions: RedisOptions; @@ -35,16 +42,26 @@ type WorkerOptions = { jobs: { [K in keyof TCatalog]: JobHandler; }; - concurrency?: { - workers?: number; - tasksPerWorker?: number; - }; + concurrency?: WorkerConcurrencyOptions; pollIntervalMs?: number; logger?: Logger; + tracer?: Tracer; +}; + +// This results in attempt 12 being a delay of 1 hour +const defaultRetrySettings = { + maxAttempts: 12, + factor: 2, + //one second + minTimeoutInMs: 1_000, + //one hour + maxTimeoutInMs: 3_600_000, + randomize: true, }; class Worker { private subscriber: Redis; + private tracer: Tracer; queue: SimpleQueue>; private jobs: WorkerOptions["jobs"]; @@ -55,6 +72,7 @@ class Worker { constructor(private options: WorkerOptions) { this.logger = options.logger ?? new Logger("Worker", "debug"); + this.tracer = options.tracer ?? trace.getTracer(options.name); const schema: QueueCatalogFromWorkerCatalog = Object.fromEntries( Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema]) @@ -83,24 +101,68 @@ class Worker { this.setupSubscriber(); } + /** + * Enqueues a job for processing. + * @param options - The enqueue options. + * @param options.id - Optional unique identifier for the job. If not provided, one will be generated. It prevents duplication. + * @param options.job - The job type from the worker catalog. + * @param options.payload - The job payload that matches the schema defined in the catalog. + * @param options.visibilityTimeoutMs - Optional visibility timeout in milliseconds. Defaults to value from catalog. + * @param options.availableAt - Optional date when the job should become available for processing. Defaults to now. + * @returns A promise that resolves when the job is enqueued. + */ enqueue({ id, job, payload, visibilityTimeoutMs, + availableAt, }: { id?: string; job: K; payload: z.infer; visibilityTimeoutMs?: number; + availableAt?: Date; }) { - const timeout = visibilityTimeoutMs ?? this.options.catalog[job].visibilityTimeoutMs; - return this.queue.enqueue({ - id, - job, - item: payload, - visibilityTimeoutMs: timeout, - }); + return startSpan( + this.tracer, + "enqueue", + async (span) => { + const timeout = visibilityTimeoutMs ?? this.options.catalog[job].visibilityTimeoutMs; + + span.setAttribute("job_visibility_timeout_ms", timeout); + + return this.queue.enqueue({ + id, + job, + item: payload, + visibilityTimeoutMs: timeout, + availableAt, + }); + }, + { + kind: SpanKind.PRODUCER, + attributes: { + job_type: job as string, + job_id: id, + }, + } + ); + } + + ack(id: string) { + return startSpan( + this.tracer, + "ack", + () => { + return this.queue.ack(id); + }, + { + attributes: { + job_id: id, + }, + } + ); } private createWorker(tasksPerWorker: number) { @@ -153,89 +215,120 @@ class Worker { return; } - await Promise.all( - items.map(async ({ id, job, item, visibilityTimeoutMs, attempt }) => { - const catalogItem = this.options.catalog[job as any]; - const handler = this.jobs[job as any]; - if (!handler) { - this.logger.error(`No handler found for job type: ${job as string}`); - return; - } - - try { - await handler({ id, payload: item, visibilityTimeoutMs, attempt }); - - //succeeded, acking the item - await this.queue.ack(id); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - this.logger.error(`Error processing item, it threw an error:`, { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - error, - errorMessage, - }); - // Requeue the failed item with a delay - try { - attempt = attempt + 1; - - const retryDelay = calculateNextRetryDelay(catalogItem.retry, attempt); - - if (!retryDelay) { - this.logger.error( - `Failed item ${id} has reached max attempts, moving to the DLQ.`, - { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - attempt, - errorMessage, - } - ); - - await this.queue.moveToDeadLetterQueue(id, errorMessage); + await startSpan( + this.tracer, + "processItems", + async (span) => { + await Promise.all( + items.map(async ({ id, job, item, visibilityTimeoutMs, attempt, timestamp }) => { + const catalogItem = this.options.catalog[job as any]; + const handler = this.jobs[job as any]; + if (!handler) { + this.logger.error(`No handler found for job type: ${job as string}`); return; } - const retryDate = new Date(Date.now() + retryDelay); - this.logger.info(`Requeued failed item ${id} with delay`, { - name: this.options.name, - id, - job, - item, - retryDate, - retryDelay, - visibilityTimeoutMs, - attempt, - }); - await this.queue.enqueue({ - id, - job, - item, - availableAt: retryDate, - attempt, - visibilityTimeoutMs, - }); - } catch (requeueError) { - this.logger.error( - `Failed to requeue item, threw error. Will automatically get rescheduled after the visilibity timeout.`, - { + try { + await startSpan( + this.tracer, + "processItem", + async () => { + await handler({ id, payload: item, visibilityTimeoutMs, attempt }); + //succeeded, acking the item + await this.queue.ack(id); + }, + { + attributes: { + job_id: id, + job_type: job as string, + attempt, + job_timestamp: timestamp.getTime(), + job_age_in_ms: Date.now() - timestamp.getTime(), + }, + } + ); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Error processing item, it threw an error:`, { name: this.options.name, id, job, item, visibilityTimeoutMs, - error: requeueError, + error, + errorMessage, + }); + // Requeue the failed item with a delay + try { + attempt = attempt + 1; + + const retrySettings = { + ...defaultRetrySettings, + ...catalogItem.retry, + }; + + const retryDelay = calculateNextRetryDelay(retrySettings, attempt); + + if (!retryDelay) { + this.logger.error( + `Failed item ${id} has reached max attempts, moving to the DLQ.`, + { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + attempt, + errorMessage, + } + ); + + await this.queue.moveToDeadLetterQueue(id, errorMessage); + return; + } + + const retryDate = new Date(Date.now() + retryDelay); + this.logger.info(`Requeued failed item ${id} with delay`, { + name: this.options.name, + id, + job, + item, + retryDate, + retryDelay, + visibilityTimeoutMs, + attempt, + }); + await this.queue.enqueue({ + id, + job, + item, + availableAt: retryDate, + attempt, + visibilityTimeoutMs, + }); + } catch (requeueError) { + this.logger.error( + `Failed to requeue item, threw error. Will automatically get rescheduled after the visilibity timeout.`, + { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + error: requeueError, + } + ); } - ); - } - } - }) + } + }) + ); + }, + { + attributes: { + worker_id: worker.threadId, + item_count: items.length, + }, + } ); } catch (error) { this.logger.error("Error dequeuing items:", { name: this.options.name, error }); @@ -262,7 +355,7 @@ class Worker { private async handleRedriveMessage(channel: string, message: string) { try { - const { id } = JSON.parse(message); + const { id } = JSON.parse(message) as any; if (typeof id !== "string") { throw new Error("Invalid message format: id must be a string"); } @@ -283,9 +376,7 @@ class Worker { this.isShuttingDown = true; this.logger.log("Shutting down workers..."); - for (const worker of this.workers) { - worker.terminate(); - } + await Promise.all(this.workers.map((worker) => worker.terminate())); await this.subscriber.unsubscribe(); await this.subscriber.quit(); @@ -301,8 +392,8 @@ class Worker { } } - public stop() { - this.shutdown(); + public async stop() { + await this.shutdown(); } } diff --git a/internal-packages/redis-worker/tsconfig.json b/internal-packages/redis-worker/tsconfig.json index 766df37eae..ff096d3e9f 100644 --- a/internal-packages/redis-worker/tsconfig.json +++ b/internal-packages/redis-worker/tsconfig.json @@ -1,7 +1,7 @@ { "compilerOptions": { "target": "ES2019", - "lib": ["ES2019", "DOM", "DOM.Iterable"], + "lib": ["ES2019", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], "module": "CommonJS", "moduleResolution": "Node", "moduleDetection": "force", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 322482b8a7..1ae892e11d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -243,6 +243,9 @@ importers: '@heroicons/react': specifier: ^2.0.12 version: 2.0.13(react@18.2.0) + '@internal/redis-worker': + specifier: workspace:* + version: link:../../internal-packages/redis-worker '@internal/zod-worker': specifier: workspace:* version: link:../../internal-packages/zod-worker @@ -948,9 +951,6 @@ importers: nanoid: specifier: ^5.0.7 version: 5.0.7 - typescript: - specifier: ^5.5.4 - version: 5.5.4 zod: specifier: 3.23.8 version: 3.23.8 diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index 2152622b2f..2c8551fd9d 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -106,6 +106,8 @@ export const immediateReturn = task({ console.info("some"); console.warn("random"); console.error("logs"); + + await new Promise((resolve) => setTimeout(resolve, 20000)); }, }); From ac71200badf88fa99755c6c390060ce38b081db0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 5 Feb 2025 22:59:16 +0000 Subject: [PATCH 2/6] Move alerts to redis worker, improving redis worker --- apps/webapp/app/env.server.ts | 38 ++ apps/webapp/app/services/worker.server.ts | 13 - apps/webapp/app/v3/commonWorker.server.ts | 93 +++++ .../app/v3/legacyRunEngineWorker.server.ts | 44 +-- .../v3/marqs/v3VisibilityTimeout.server.ts | 4 +- .../v3/services/alerts/deliverAlert.server.ts | 24 +- .../alerts/performDeploymentAlerts.server.ts | 46 +-- .../alerts/performTaskAttemptAlerts.server.ts | 79 ---- .../alerts/performTaskRunAlerts.server.ts | 46 +-- .../createDeployedBackgroundWorker.server.ts | 2 +- .../services/deploymentIndexFailed.server.ts | 2 +- .../app/v3/services/failDeployment.server.ts | 2 +- .../v3/services/finalizeDeployment.server.ts | 2 +- .../app/v3/services/finalizeTaskRun.server.ts | 2 +- .../v3/services/timeoutDeployment.server.ts | 2 +- apps/webapp/remix.config.js | 2 + internal-packages/redis-worker/package.json | 3 +- internal-packages/redis-worker/src/queue.ts | 40 +- .../redis-worker/src/worker.test.ts | 18 +- internal-packages/redis-worker/src/worker.ts | 362 +++++++++--------- pnpm-lock.yaml | 15 + references/v3-catalog/src/trigger/simple.ts | 22 +- 22 files changed, 453 insertions(+), 408 deletions(-) create mode 100644 apps/webapp/app/v3/commonWorker.server.ts delete mode 100644 apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 425df271de..9ce3f56b65 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -372,6 +372,8 @@ const EnvironmentSchema = z.object({ LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z .string() @@ -404,6 +406,42 @@ const EnvironmentSchema = z.object({ .string() .default(process.env.REDIS_TLS_DISABLED ?? "false"), LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + + COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), + COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), + COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), + COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50), + COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100), + + COMMON_WORKER_REDIS_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_HOST), + COMMON_WORKER_REDIS_READER_HOST: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_READER_HOST), + COMMON_WORKER_REDIS_READER_PORT: z.coerce + .number() + .optional() + .transform( + (v) => + v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined) + ), + COMMON_WORKER_REDIS_PORT: z.coerce + .number() + .optional() + .transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)), + COMMON_WORKER_REDIS_USERNAME: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_USERNAME), + COMMON_WORKER_REDIS_PASSWORD: z + .string() + .optional() + .transform((v) => v ?? process.env.REDIS_PASSWORD), + COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"), + COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), }); export type Environment = z.infer; diff --git a/apps/webapp/app/services/worker.server.ts b/apps/webapp/app/services/worker.server.ts index 51f02ecc42..3c76907aa7 100644 --- a/apps/webapp/app/services/worker.server.ts +++ b/apps/webapp/app/services/worker.server.ts @@ -8,7 +8,6 @@ import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server"; import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server"; import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server"; import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server"; -import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server"; import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server"; import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server"; import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server"; @@ -156,9 +155,6 @@ const workerCatalog = { "v3.performTaskRunAlerts": z.object({ runId: z.string(), }), - "v3.performTaskAttemptAlerts": z.object({ - attemptId: z.string(), - }), "v3.deliverAlert": z.object({ alertId: z.string(), }), @@ -609,15 +605,6 @@ function getWorkerQueue() { return await service.call(payload.runId); }, }, - "v3.performTaskAttemptAlerts": { - priority: 0, - maxAttempts: 3, - handler: async (payload, job) => { - const service = new PerformTaskAttemptAlertsService(); - - return await service.call(payload.attemptId); - }, - }, "v3.deliverAlert": { priority: 0, maxAttempts: 8, diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts new file mode 100644 index 0000000000..4f091f1112 --- /dev/null +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -0,0 +1,93 @@ +import { Worker as RedisWorker } from "@internal/redis-worker"; +import { Logger } from "@trigger.dev/core/logger"; +import { z } from "zod"; +import { env } from "~/env.server"; +import { logger } from "~/services/logger.server"; +import { singleton } from "~/utils/singleton"; +import { DeliverAlertService } from "./services/alerts/deliverAlert.server"; +import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server"; +import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server"; + +function initializeWorker() { + const redisOptions = { + keyPrefix: "common:worker:", + host: env.COMMON_WORKER_REDIS_HOST, + port: env.COMMON_WORKER_REDIS_PORT, + username: env.COMMON_WORKER_REDIS_USERNAME, + password: env.COMMON_WORKER_REDIS_PASSWORD, + enableAutoPipelining: true, + ...(env.COMMON_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }; + + logger.debug(`👨‍🏭 Initializing common worker at host ${env.COMMON_WORKER_REDIS_HOST}`); + + const worker = new RedisWorker({ + name: "common-worker", + redisOptions, + catalog: { + "v3.performTaskRunAlerts": { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.performDeploymentAlerts": { + schema: z.object({ + deploymentId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + "v3.deliverAlert": { + schema: z.object({ + alertId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + }, + concurrency: { + workers: env.COMMON_WORKER_CONCURRENCY_WORKERS, + tasksPerWorker: env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.COMMON_WORKER_CONCURRENCY_LIMIT, + }, + pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL, + logger: new Logger("CommonWorker", "debug"), + jobs: { + "v3.deliverAlert": async ({ payload }) => { + const service = new DeliverAlertService(); + + return await service.call(payload.alertId); + }, + "v3.performDeploymentAlerts": async ({ payload }) => { + const service = new PerformDeploymentAlertsService(); + + return await service.call(payload.deploymentId); + }, + "v3.performTaskRunAlerts": async ({ payload }) => { + const service = new PerformTaskRunAlertsService(); + return await service.call(payload.runId); + }, + }, + }); + + if (env.WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + + return worker; +} + +export const commonWorker = singleton("commonWorker", initializeWorker); diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index d8bf68f728..472aa577db 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -5,30 +5,8 @@ import { env } from "~/env.server"; import { logger } from "~/services/logger.server"; import { singleton } from "~/utils/singleton"; import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server"; -import { tracer } from "./tracer.server"; - -const workerCatalog = { - runHeartbeat: { - schema: z.object({ - runId: z.string(), - }), - visibilityTimeoutMs: 10000, - }, -}; function initializeWorker() { - if (env.WORKER_ENABLED !== "true") { - logger.debug("RedisWorker not initialized because WORKER_ENABLED is not set to true"); - return; - } - - if (!env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST || !env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT) { - logger.debug( - "RedisWorker not initialized because LEGACY_RUN_ENGINE_WORKER_REDIS_HOST or LEGACY_RUN_ENGINE_WORKER_REDIS_PORT is not set" - ); - return; - } - const redisOptions = { keyPrefix: "legacy-run-engine:worker:", host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST, @@ -46,12 +24,24 @@ function initializeWorker() { const worker = new RedisWorker({ name: "legacy-run-engine-worker", redisOptions, - catalog: workerCatalog, + catalog: { + runHeartbeat: { + schema: z.object({ + runId: z.string(), + }), + visibilityTimeoutMs: 60_000, + retry: { + maxAttempts: 3, + }, + }, + }, concurrency: { workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS, tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER, + limit: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT, }, pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL, + immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL, logger: new Logger("LegacyRunEngineWorker", "debug"), jobs: { runHeartbeat: async ({ payload }) => { @@ -62,6 +52,14 @@ function initializeWorker() { }, }); + if (env.WORKER_ENABLED === "true") { + logger.debug( + `👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}` + ); + + worker.start(); + } + return worker; } diff --git a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts index 92b1d4b1cd..7611ee2bce 100644 --- a/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts +++ b/apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts @@ -14,7 +14,7 @@ export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy { export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy { async heartbeat(messageId: string, timeoutInMs: number): Promise { - await legacyRunEngineWorker?.enqueue({ + await legacyRunEngineWorker.enqueue({ id: `heartbeat:${messageId}`, job: "runHeartbeat", payload: { runId: messageId }, @@ -23,6 +23,6 @@ export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeo } async cancelHeartbeat(messageId: string): Promise { - await legacyRunEngineWorker?.ack(`heartbeat:${messageId}`); + await legacyRunEngineWorker.ack(`heartbeat:${messageId}`); } } diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index a84388c583..282a839bff 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -28,6 +28,7 @@ import { decryptSecret } from "~/services/secrets/secretStore.server"; import { workerQueue } from "~/services/worker.server"; import { BaseService } from "../baseService.server"; import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus"; +import { commonWorker } from "~/v3/commonWorker.server"; type FoundAlert = Prisma.Result< typeof prisma.projectAlert, @@ -1092,22 +1093,13 @@ export class DeliverAlertService extends BaseService { return text; } - static async enqueue( - alertId: string, - tx: PrismaClientOrTransaction, - options?: { runAt?: Date; queueName?: string } - ) { - return await workerQueue.enqueue( - "v3.deliverAlert", - { - alertId, - }, - { - tx, - runAt: options?.runAt, - jobKey: `deliverAlert:${alertId}`, - } - ); + static async enqueue(alertId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `alert:${alertId}`, + job: "v3.deliverAlert", + payload: { alertId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts index cd4d20052d..4bbe7b50cf 100644 --- a/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts @@ -4,6 +4,7 @@ import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; +import { commonWorker } from "~/v3/commonWorker.server"; export class PerformDeploymentAlertsService extends BaseService { public async call(deploymentId: string) { @@ -45,34 +46,27 @@ export class PerformDeploymentAlertsService extends BaseService { deployment: WorkerDeployment, alertType: ProjectAlertType ) { - await $transaction(this._prisma, "create and send deploy alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: deployment.projectId, - environmentId: deployment.environmentId, - status: "PENDING", - type: alertType, - workerDeploymentId: deployment.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); + const alert = await this._prisma.projectAlert.create({ + data: { + friendlyId: generateFriendlyId("alert"), + channelId: alertChannel.id, + projectId: deployment.projectId, + environmentId: deployment.environmentId, + status: "PENDING", + type: alertType, + workerDeploymentId: deployment.id, + }, }); + + await DeliverAlertService.enqueue(alert.id); } - static async enqueue(deploymentId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performDeploymentAlerts", - { - deploymentId, - }, - { - tx, - runAt, - jobKey: `performDeploymentAlerts:${deploymentId}`, - } - ); + static async enqueue(deploymentId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `performDeploymentAlerts:${deploymentId}`, + job: "v3.performDeploymentAlerts", + payload: { deploymentId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts deleted file mode 100644 index dab6539aeb..0000000000 --- a/apps/webapp/app/v3/services/alerts/performTaskAttemptAlerts.server.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { Prisma, ProjectAlertChannel } from "@trigger.dev/database"; -import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server"; -import { workerQueue } from "~/services/worker.server"; -import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; -import { BaseService } from "../baseService.server"; -import { DeliverAlertService } from "./deliverAlert.server"; - -type FoundTaskAttempt = Prisma.Result< - typeof prisma.taskRunAttempt, - { include: { taskRun: true; backgroundWorkerTask: true; runtimeEnvironment: true } }, - "findUniqueOrThrow" ->; - -export class PerformTaskAttemptAlertsService extends BaseService { - public async call(attemptId: string) { - const taskAttempt = await this._prisma.taskRunAttempt.findFirst({ - where: { id: attemptId }, - include: { - taskRun: true, - backgroundWorkerTask: true, - runtimeEnvironment: true, - }, - }); - - if (!taskAttempt) { - return; - } - - // Find all the alert channels - const alertChannels = await this._prisma.projectAlertChannel.findMany({ - where: { - projectId: taskAttempt.taskRun.projectId, - alertTypes: { - has: "TASK_RUN_ATTEMPT", - }, - environmentTypes: { - has: taskAttempt.runtimeEnvironment.type, - }, - enabled: true, - }, - }); - - for (const alertChannel of alertChannels) { - await this.#createAndSendAlert(alertChannel, taskAttempt); - } - } - - async #createAndSendAlert(alertChannel: ProjectAlertChannel, taskAttempt: FoundTaskAttempt) { - await $transaction(this._prisma, "create and send attempt alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: taskAttempt.taskRun.projectId, - environmentId: taskAttempt.runtimeEnvironmentId, - status: "PENDING", - type: "TASK_RUN_ATTEMPT", - taskRunAttemptId: taskAttempt.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); - }); - } - - static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performTaskAttemptAlerts", - { - attemptId, - }, - { - tx, - runAt, - jobKey: `performTaskAttemptAlerts:${attemptId}`, - } - ); - } -} diff --git a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts index 1d3b6d62f9..0706bd0192 100644 --- a/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts +++ b/apps/webapp/app/v3/services/alerts/performTaskRunAlerts.server.ts @@ -4,6 +4,7 @@ import { workerQueue } from "~/services/worker.server"; import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { BaseService } from "../baseService.server"; import { DeliverAlertService } from "./deliverAlert.server"; +import { commonWorker } from "~/v3/commonWorker.server"; type FoundRun = Prisma.Result< typeof prisma.taskRun, @@ -45,34 +46,27 @@ export class PerformTaskRunAlertsService extends BaseService { } async #createAndSendAlert(alertChannel: ProjectAlertChannel, run: FoundRun) { - await $transaction(this._prisma, "create and send run alert", async (tx) => { - const alert = await this._prisma.projectAlert.create({ - data: { - friendlyId: generateFriendlyId("alert"), - channelId: alertChannel.id, - projectId: run.projectId, - environmentId: run.runtimeEnvironmentId, - status: "PENDING", - type: "TASK_RUN", - taskRunId: run.id, - }, - }); - - await DeliverAlertService.enqueue(alert.id, tx); + const alert = await this._prisma.projectAlert.create({ + data: { + friendlyId: generateFriendlyId("alert"), + channelId: alertChannel.id, + projectId: run.projectId, + environmentId: run.runtimeEnvironmentId, + status: "PENDING", + type: "TASK_RUN", + taskRunId: run.id, + }, }); + + await DeliverAlertService.enqueue(alert.id); } - static async enqueue(runId: string, tx: PrismaClientOrTransaction, runAt?: Date) { - return await workerQueue.enqueue( - "v3.performTaskRunAlerts", - { - runId, - }, - { - tx, - runAt, - jobKey: `performTaskRunAlerts:${runId}`, - } - ); + static async enqueue(runId: string, runAt?: Date) { + return await commonWorker.enqueue({ + id: `performTaskRunAlerts:${runId}`, + job: "v3.performTaskRunAlerts", + payload: { runId }, + availableAt: runAt, + }); } } diff --git a/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts index 11396acaa5..c46f5b0266 100644 --- a/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts @@ -148,7 +148,7 @@ export class CreateDeployedBackgroundWorkerService extends BaseService { } await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); await TimeoutDeploymentService.dequeue(deployment.id, this._prisma); return backgroundWorker; diff --git a/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts b/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts index 44b5f55eeb..fffab0af03 100644 --- a/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts +++ b/apps/webapp/app/v3/services/deploymentIndexFailed.server.ts @@ -66,7 +66,7 @@ export class DeploymentIndexFailed extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(failedDeployment.id); return failedDeployment; } diff --git a/apps/webapp/app/v3/services/failDeployment.server.ts b/apps/webapp/app/v3/services/failDeployment.server.ts index a5d7fc9e86..dbb9194050 100644 --- a/apps/webapp/app/v3/services/failDeployment.server.ts +++ b/apps/webapp/app/v3/services/failDeployment.server.ts @@ -49,7 +49,7 @@ export class FailDeploymentService extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(failedDeployment.id); return failedDeployment; } diff --git a/apps/webapp/app/v3/services/finalizeDeployment.server.ts b/apps/webapp/app/v3/services/finalizeDeployment.server.ts index 58ad8b03dd..042afa4a66 100644 --- a/apps/webapp/app/v3/services/finalizeDeployment.server.ts +++ b/apps/webapp/app/v3/services/finalizeDeployment.server.ts @@ -124,7 +124,7 @@ export class FinalizeDeploymentService extends BaseService { } await ExecuteTasksWaitingForDeployService.enqueue(deployment.worker.id, this._prisma); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); return finalizedDeployment; } diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index fd34cec64a..81df8461ec 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -129,7 +129,7 @@ export class FinalizeTaskRunService extends BaseService { //enqueue alert if (isFailedRunStatus(run.status)) { - await PerformTaskRunAlertsService.enqueue(run.id, this._prisma); + await PerformTaskRunAlertsService.enqueue(run.id); } if (isFatalRunStatus(run.status)) { diff --git a/apps/webapp/app/v3/services/timeoutDeployment.server.ts b/apps/webapp/app/v3/services/timeoutDeployment.server.ts index f86f513b18..1bb9493a89 100644 --- a/apps/webapp/app/v3/services/timeoutDeployment.server.ts +++ b/apps/webapp/app/v3/services/timeoutDeployment.server.ts @@ -39,7 +39,7 @@ export class TimeoutDeploymentService extends BaseService { }, }); - await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma); + await PerformDeploymentAlertsService.enqueue(deployment.id); } static async enqueue( diff --git a/apps/webapp/remix.config.js b/apps/webapp/remix.config.js index d2417a3eb5..ffa62d14af 100644 --- a/apps/webapp/remix.config.js +++ b/apps/webapp/remix.config.js @@ -11,6 +11,8 @@ module.exports = { /^remix-utils.*/, "marked", "axios", + "p-limit", + "yocto-queue", "@trigger.dev/core", "@trigger.dev/sdk", "@trigger.dev/platform", diff --git a/internal-packages/redis-worker/package.json b/internal-packages/redis-worker/package.json index bf44ab71cb..95922af921 100644 --- a/internal-packages/redis-worker/package.json +++ b/internal-packages/redis-worker/package.json @@ -11,6 +11,7 @@ "ioredis": "^5.3.2", "lodash.omit": "^4.5.0", "nanoid": "^5.0.7", + "p-limit": "^6.2.0", "zod": "3.23.8" }, "devDependencies": { @@ -20,6 +21,6 @@ }, "scripts": { "typecheck": "tsc --noEmit", - "test": "vitest" + "test": "vitest --no-file-parallelism" } } \ No newline at end of file diff --git a/internal-packages/redis-worker/src/queue.ts b/internal-packages/redis-worker/src/queue.ts index 8a290d4c28..94309aaaa7 100644 --- a/internal-packages/redis-worker/src/queue.ts +++ b/internal-packages/redis-worker/src/queue.ts @@ -13,6 +13,25 @@ export type MessageCatalogValue< TKey extends MessageCatalogKey, > = z.infer; +export type AnyMessageCatalog = MessageCatalogSchema; +export type QueueItem = { + id: string; + job: MessageCatalogKey; + item: MessageCatalogValue>; + visibilityTimeoutMs: number; + attempt: number; + timestamp: Date; +}; + +export type AnyQueueItem = { + id: string; + job: string; + item: any; + visibilityTimeoutMs: number; + attempt: number; + timestamp: Date; +}; + export class SimpleQueue { name: string; private redis: Redis; @@ -107,16 +126,7 @@ export class SimpleQueue { throw e; } } - async dequeue(count: number = 1): Promise< - Array<{ - id: string; - job: MessageCatalogKey; - item: MessageCatalogValue>; - visibilityTimeoutMs: number; - attempt: number; - timestamp: Date; - }> - > { + async dequeue(count: number = 1): Promise>> { const now = Date.now(); try { @@ -382,10 +392,13 @@ export class SimpleQueue { local parsedItem = cjson.decode(item) parsedItem.errorMessage = errorMessage + local time = redis.call('TIME') + local now = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + redis.call('ZREM', queue, id) redis.call('HDEL', items, id) - redis.call('ZADD', dlq, redis.call('TIME')[1], id) + redis.call('ZADD', dlq, now, id) redis.call('HSET', dlqItems, id, cjson.encode(parsedItem)) return 1 @@ -409,10 +422,13 @@ export class SimpleQueue { local parsedItem = cjson.decode(item) parsedItem.errorMessage = nil + local time = redis.call('TIME') + local now = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + redis.call('ZREM', dlq, id) redis.call('HDEL', dlqItems, id) - redis.call('ZADD', queue, redis.call('TIME')[1], id) + redis.call('ZADD', queue, now, id) redis.call('HSET', items, id, cjson.encode(parsedItem)) return 1 diff --git a/internal-packages/redis-worker/src/worker.test.ts b/internal-packages/redis-worker/src/worker.test.ts index de2e78a7b0..3eb760cf9c 100644 --- a/internal-packages/redis-worker/src/worker.test.ts +++ b/internal-packages/redis-worker/src/worker.test.ts @@ -34,7 +34,7 @@ describe("Worker", () => { tasksPerWorker: 3, }, logger: new Logger("test", "log"), - }); + }).start(); try { // Enqueue 10 items @@ -47,10 +47,8 @@ describe("Worker", () => { }); } - worker.start(); - // Wait for items to be processed - await new Promise((resolve) => setTimeout(resolve, 600)); + await new Promise((resolve) => setTimeout(resolve, 2000)); expect(processedItems.length).toBe(10); expect(new Set(processedItems).size).toBe(10); // Ensure all items were processed uniquely @@ -97,7 +95,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue 10 items @@ -110,8 +108,6 @@ describe("Worker", () => { }); } - worker.start(); - // Wait for items to be processed await new Promise((resolve) => setTimeout(resolve, 500)); @@ -158,7 +154,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue the item that will permanently fail @@ -175,8 +171,6 @@ describe("Worker", () => { payload: { value: 1 }, }); - worker.start(); - // Wait for items to be processed and retried await new Promise((resolve) => setTimeout(resolve, 1000)); @@ -229,7 +223,7 @@ describe("Worker", () => { }, pollIntervalMs: 50, logger: new Logger("test", "error"), - }); + }).start(); try { // Enqueue the item that will fail 3 times @@ -239,8 +233,6 @@ describe("Worker", () => { payload: { value: 999 }, }); - worker.start(); - // Wait for the item to be processed and moved to DLQ await new Promise((resolve) => setTimeout(resolve, 1000)); diff --git a/internal-packages/redis-worker/src/worker.ts b/internal-packages/redis-worker/src/worker.ts index 7084b67a42..e80b63bc25 100644 --- a/internal-packages/redis-worker/src/worker.ts +++ b/internal-packages/redis-worker/src/worker.ts @@ -1,15 +1,14 @@ import { SpanKind, trace, Tracer } from "@opentelemetry/api"; import { Logger } from "@trigger.dev/core/logger"; -import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; import { calculateNextRetryDelay } from "@trigger.dev/core/v3"; +import { type RetryOptions } from "@trigger.dev/core/v3/schemas"; import { type RedisOptions } from "ioredis"; -import os from "os"; -import { Worker as NodeWorker } from "worker_threads"; import { z } from "zod"; -import { SimpleQueue } from "./queue.js"; - +import { AnyQueueItem, SimpleQueue } from "./queue.js"; import Redis from "ioredis"; +import { nanoid } from "nanoid"; import { startSpan } from "./telemetry.js"; +import pLimit from "p-limit"; export type WorkerCatalog = { [key: string]: { @@ -33,6 +32,7 @@ type JobHandler = (param export type WorkerConcurrencyOptions = { workers?: number; tasksPerWorker?: number; + limit?: number; }; type WorkerOptions = { @@ -44,6 +44,7 @@ type WorkerOptions = { }; concurrency?: WorkerConcurrencyOptions; pollIntervalMs?: number; + immediatePollIntervalMs?: number; logger?: Logger; tracer?: Tracer; }; @@ -60,16 +61,19 @@ const defaultRetrySettings = { }; class Worker { - private subscriber: Redis; + private subscriber: Redis | undefined; private tracer: Tracer; queue: SimpleQueue>; private jobs: WorkerOptions["jobs"]; private logger: Logger; - private workers: NodeWorker[] = []; + private workerLoops: Promise[] = []; private isShuttingDown = false; private concurrency: Required["concurrency"]>>; + // The p-limit limiter to control overall concurrency. + private limiter: ReturnType; + constructor(private options: WorkerOptions) { this.logger = options.logger ?? new Logger("Worker", "debug"); this.tracer = options.tracer ?? trace.getTracer(options.name); @@ -77,7 +81,7 @@ class Worker { const schema: QueueCatalogFromWorkerCatalog = Object.fromEntries( Object.entries(this.options.catalog).map(([key, value]) => [key, value.schema]) ) as QueueCatalogFromWorkerCatalog; - // + this.queue = new SimpleQueue({ name: options.name, redisOptions: options.redisOptions, @@ -87,18 +91,27 @@ class Worker { this.jobs = options.jobs; - const { workers = os.cpus().length, tasksPerWorker = 1 } = options.concurrency ?? {}; - this.concurrency = { workers, tasksPerWorker }; + const { workers = 1, tasksPerWorker = 1, limit = 10 } = options.concurrency ?? {}; + this.concurrency = { workers, tasksPerWorker, limit }; + + // Create a p-limit instance using this limit. + this.limiter = pLimit(this.concurrency.limit); + } + + public start() { + const { workers, tasksPerWorker } = this.concurrency; - // Initialize worker threads + // Launch a number of "worker loops" on the main thread. for (let i = 0; i < workers; i++) { - this.createWorker(tasksPerWorker); + this.workerLoops.push(this.runWorkerLoop(`worker-${nanoid(12)}`, tasksPerWorker)); } this.setupShutdownHandlers(); - this.subscriber = new Redis(options.redisOptions); + this.subscriber = new Redis(this.options.redisOptions); this.setupSubscriber(); + + return this; } /** @@ -165,184 +178,164 @@ class Worker { ); } - private createWorker(tasksPerWorker: number) { - const worker = new NodeWorker( - ` - const { parentPort } = require('worker_threads'); - - parentPort.on('message', async (message) => { - if (message.type === 'process') { - // Process items here - parentPort.postMessage({ type: 'done' }); - } - }); - `, - { eval: true } - ); + /** + * The main loop that each worker runs. It repeatedly polls for items, + * processes them, and then waits before the next iteration. + */ + private async runWorkerLoop(workerId: string, taskCount: number): Promise { + const pollIntervalMs = this.options.pollIntervalMs ?? 1000; + const immediatePollIntervalMs = this.options.immediatePollIntervalMs ?? 100; - worker.on("message", (message) => { - if (message.type === "done") { - this.processItems(worker, tasksPerWorker); + while (!this.isShuttingDown) { + // Check overall load. If at capacity, wait a bit before trying to dequeue more. + if (this.limiter.activeCount + this.limiter.pendingCount >= this.concurrency.limit) { + await Worker.delay(pollIntervalMs); + continue; } - }); - worker.on("error", (error) => { - this.logger.error("Worker error:", { error }); - }); + try { + const items = await this.queue.dequeue(taskCount); - worker.on("exit", (code) => { - if (code !== 0) { - this.logger.warn(`Worker stopped with exit code ${code}`); - } - if (!this.isShuttingDown) { - this.createWorker(tasksPerWorker); + if (items.length === 0) { + await Worker.delay(pollIntervalMs); + continue; + } + + // Schedule each item using the limiter. + for (const item of items) { + this.limiter(() => this.processItem(item as AnyQueueItem, items.length, workerId)).catch( + (err) => { + this.logger.error("Unhandled error in processItem:", { error: err, workerId, item }); + } + ); + } + } catch (error) { + this.logger.error("Error dequeuing items:", { name: this.options.name, error }); + await Worker.delay(pollIntervalMs); + continue; } - }); - this.workers.push(worker); - this.processItems(worker, tasksPerWorker); + // Wait briefly before immediately polling again since we processed items + await Worker.delay(immediatePollIntervalMs); + } } - private async processItems(worker: NodeWorker, count: number) { - if (this.isShuttingDown) return; - - const pollIntervalMs = this.options.pollIntervalMs ?? 1000; - - try { - const items = await this.queue.dequeue(count); - if (items.length === 0) { - setTimeout(() => this.processItems(worker, count), pollIntervalMs); - return; - } + /** + * Processes a single item. + */ + private async processItem( + { id, job, item, visibilityTimeoutMs, attempt, timestamp }: AnyQueueItem, + batchSize: number, + workerId: string + ): Promise { + const catalogItem = this.options.catalog[job as any]; + const handler = this.jobs[job as any]; + if (!handler) { + this.logger.error(`No handler found for job type: ${job}`); + return; + } - await startSpan( - this.tracer, - "processItems", - async (span) => { - await Promise.all( - items.map(async ({ id, job, item, visibilityTimeoutMs, attempt, timestamp }) => { - const catalogItem = this.options.catalog[job as any]; - const handler = this.jobs[job as any]; - if (!handler) { - this.logger.error(`No handler found for job type: ${job as string}`); - return; - } - - try { - await startSpan( - this.tracer, - "processItem", - async () => { - await handler({ id, payload: item, visibilityTimeoutMs, attempt }); - //succeeded, acking the item - await this.queue.ack(id); - }, - { - attributes: { - job_id: id, - job_type: job as string, - attempt, - job_timestamp: timestamp.getTime(), - job_age_in_ms: Date.now() - timestamp.getTime(), - }, - } - ); - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - this.logger.error(`Error processing item, it threw an error:`, { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - error, - errorMessage, - }); - // Requeue the failed item with a delay - try { - attempt = attempt + 1; - - const retrySettings = { - ...defaultRetrySettings, - ...catalogItem.retry, - }; - - const retryDelay = calculateNextRetryDelay(retrySettings, attempt); - - if (!retryDelay) { - this.logger.error( - `Failed item ${id} has reached max attempts, moving to the DLQ.`, - { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - attempt, - errorMessage, - } - ); - - await this.queue.moveToDeadLetterQueue(id, errorMessage); - return; - } - - const retryDate = new Date(Date.now() + retryDelay); - this.logger.info(`Requeued failed item ${id} with delay`, { - name: this.options.name, - id, - job, - item, - retryDate, - retryDelay, - visibilityTimeoutMs, - attempt, - }); - await this.queue.enqueue({ - id, - job, - item, - availableAt: retryDate, - attempt, - visibilityTimeoutMs, - }); - } catch (requeueError) { - this.logger.error( - `Failed to requeue item, threw error. Will automatically get rescheduled after the visilibity timeout.`, - { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - error: requeueError, - } - ); - } - } - }) - ); + await startSpan( + this.tracer, + "processItem", + async () => { + await handler({ id, payload: item, visibilityTimeoutMs, attempt }); + // On success, acknowledge the item. + await this.queue.ack(id); + }, + { + kind: SpanKind.CONSUMER, + attributes: { + job_id: id, + job_type: job, + attempt, + job_timestamp: timestamp.getTime(), + job_age_in_ms: Date.now() - timestamp.getTime(), + worker_id: workerId, + worker_limit_concurrency: this.limiter.concurrency, + worker_limit_active: this.limiter.activeCount, + worker_limit_pending: this.limiter.pendingCount, + worker_name: this.options.name, + batch_size: batchSize, }, - { - attributes: { - worker_id: worker.threadId, - item_count: items.length, - }, + } + ).catch(async (error) => { + const errorMessage = error instanceof Error ? error.message : String(error); + this.logger.error(`Error processing item:`, { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + error, + errorMessage, + }); + // Attempt requeue logic. + try { + const newAttempt = attempt + 1; + const retrySettings = { + ...defaultRetrySettings, + ...catalogItem.retry, + }; + const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt); + + if (!retryDelay) { + this.logger.error(`Item ${id} reached max attempts. Moving to DLQ.`, { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + attempt: newAttempt, + errorMessage, + }); + await this.queue.moveToDeadLetterQueue(id, errorMessage); + return; } - ); - } catch (error) { - this.logger.error("Error dequeuing items:", { name: this.options.name, error }); - setTimeout(() => this.processItems(worker, count), pollIntervalMs); - return; - } - // Immediately process next batch because there were items in the queue - this.processItems(worker, count); + const retryDate = new Date(Date.now() + retryDelay); + this.logger.info(`Requeuing failed item ${id} with delay`, { + name: this.options.name, + id, + job, + item, + retryDate, + retryDelay, + visibilityTimeoutMs, + attempt: newAttempt, + }); + await this.queue.enqueue({ + id, + job, + item, + availableAt: retryDate, + attempt: newAttempt, + visibilityTimeoutMs, + }); + } catch (requeueError) { + this.logger.error( + `Failed to requeue item ${id}. It will be retried after the visibility timeout.`, + { + name: this.options.name, + id, + job, + item, + visibilityTimeoutMs, + error: requeueError, + } + ); + } + }); + } + + // A simple helper to delay for a given number of milliseconds. + private static delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); } private setupSubscriber() { const channel = `${this.options.name}:redrive`; - this.subscriber.subscribe(channel, (err) => { + this.subscriber?.subscribe(channel, (err) => { if (err) { this.logger.error(`Failed to subscribe to ${channel}`, { error: err }); } else { @@ -350,7 +343,7 @@ class Worker { } }); - this.subscriber.on("message", this.handleRedriveMessage.bind(this)); + this.subscriber?.on("message", this.handleRedriveMessage.bind(this)); } private async handleRedriveMessage(channel: string, message: string) { @@ -374,24 +367,17 @@ class Worker { private async shutdown() { if (this.isShuttingDown) return; this.isShuttingDown = true; - this.logger.log("Shutting down workers..."); + this.logger.log("Shutting down worker loops..."); - await Promise.all(this.workers.map((worker) => worker.terminate())); + // Wait for all worker loops to finish. + await Promise.all(this.workerLoops); - await this.subscriber.unsubscribe(); - await this.subscriber.quit(); + await this.subscriber?.unsubscribe(); + await this.subscriber?.quit(); await this.queue.close(); this.logger.log("All workers and subscribers shut down."); } - public start() { - this.logger.log("Starting workers..."); - this.isShuttingDown = false; - for (const worker of this.workers) { - this.processItems(worker, this.concurrency.tasksPerWorker); - } - } - public async stop() { await this.shutdown(); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1ae892e11d..604337f9da 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -951,6 +951,9 @@ importers: nanoid: specifier: ^5.0.7 version: 5.0.7 + p-limit: + specifier: ^6.2.0 + version: 6.2.0 zod: specifier: 3.23.8 version: 3.23.8 @@ -26272,6 +26275,13 @@ packages: yocto-queue: 1.0.0 dev: true + /p-limit@6.2.0: + resolution: {integrity: sha512-kuUqqHNUqoIWp/c467RI4X6mmyuojY5jGutNU0wVTmEOOfcuwLqyMVoAi9MKi2Ak+5i9+nhmrK4ufZE8069kHA==} + engines: {node: '>=18'} + dependencies: + yocto-queue: 1.1.1 + dev: false + /p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} @@ -32852,6 +32862,11 @@ packages: engines: {node: '>=12.20'} dev: true + /yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + dev: false + /youch@3.3.3: resolution: {integrity: sha512-qSFXUk3UZBLfggAW3dJKg0BMblG5biqSF8M34E06o5CSsZtH92u9Hqmj2RzGiHDi64fhe83+4tENFP2DB6t6ZA==} dependencies: diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index 2c8551fd9d..8dcf96f46e 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -111,12 +111,28 @@ export const immediateReturn = task({ }, }); +export const simulateErrorTester = task({ + id: "simulateErrorTester", + run: async (payload: { message: string }) => { + await simulateError.batchTrigger([ + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + { payload: { message: payload.message }, options: { maxAttempts: 1 } }, + ]); + }, +}); + export const simulateError = task({ id: "simulateError", + retry: { + maxAttempts: 1, + }, run: async (payload: { message: string }) => { - // Sleep for 1 second - await new Promise((resolve) => setTimeout(resolve, 1000)); - thisFunctionWillThrow(); }, }); From f3cd6cbe4e48aaa468035e9a322e9658f8ce5cc8 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Feb 2025 09:54:00 +0000 Subject: [PATCH 3/6] Fix typecheck errors --- packages/core/src/v3/apiClient/runStream.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 15de844759..bd73937442 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -509,7 +509,7 @@ const isSafari = () => { */ if (isSafari()) { - // @ts-expect-error + // @ts-ignore-error ReadableStream.prototype.values ??= function ({ preventCancel = false } = {}) { const reader = this.getReader(); return { @@ -541,6 +541,6 @@ if (isSafari()) { }; }; - // @ts-expect-error + // @ts-ignore-error ReadableStream.prototype[Symbol.asyncIterator] ??= ReadableStream.prototype.values; } From b902ab1bb9b0b146ed0e57c78fc4e0c3758560b5 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Feb 2025 10:09:22 +0000 Subject: [PATCH 4/6] Use single threaded tests for redis worker --- internal-packages/redis-worker/vitest.config.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal-packages/redis-worker/vitest.config.ts b/internal-packages/redis-worker/vitest.config.ts index 4afd926425..dfe0df2746 100644 --- a/internal-packages/redis-worker/vitest.config.ts +++ b/internal-packages/redis-worker/vitest.config.ts @@ -4,5 +4,11 @@ export default defineConfig({ test: { include: ["**/*.test.ts"], globals: true, + fileParallelism: false, + poolOptions: { + threads: { + singleThread: true, + }, + }, }, }); From 630ba39c0658505091bf01193dc9b0ade2157124 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Feb 2025 10:21:05 +0000 Subject: [PATCH 5/6] Enable/disable the redis workers independently --- apps/webapp/app/env.server.ts | 2 ++ apps/webapp/app/v3/commonWorker.server.ts | 2 +- apps/webapp/app/v3/legacyRunEngineWorker.server.ts | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 9ce3f56b65..c41b4ea19f 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -369,6 +369,7 @@ const EnvironmentSchema = z.object({ BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"), BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"), + LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1), LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), @@ -407,6 +408,7 @@ const EnvironmentSchema = z.object({ .default(process.env.REDIS_TLS_DISABLED ?? "false"), LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"), + COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"), COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2), COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10), COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000), diff --git a/apps/webapp/app/v3/commonWorker.server.ts b/apps/webapp/app/v3/commonWorker.server.ts index 4f091f1112..5e43d3c90a 100644 --- a/apps/webapp/app/v3/commonWorker.server.ts +++ b/apps/webapp/app/v3/commonWorker.server.ts @@ -79,7 +79,7 @@ function initializeWorker() { }, }); - if (env.WORKER_ENABLED === "true") { + if (env.COMMON_WORKER_ENABLED === "true") { logger.debug( `👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}` ); diff --git a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts index 472aa577db..52a978fffc 100644 --- a/apps/webapp/app/v3/legacyRunEngineWorker.server.ts +++ b/apps/webapp/app/v3/legacyRunEngineWorker.server.ts @@ -52,7 +52,7 @@ function initializeWorker() { }, }); - if (env.WORKER_ENABLED === "true") { + if (env.LEGACY_RUN_ENGINE_WORKER_ENABLED === "true") { logger.debug( `👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}` ); From a2ca3ee16826f4d12994d87b879e52e4b1d0a77e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 6 Feb 2025 10:21:39 +0000 Subject: [PATCH 6/6] Remove preview release from PR checks --- .github/workflows/pr_checks.yml | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/.github/workflows/pr_checks.yml b/.github/workflows/pr_checks.yml index c8eee52a34..b00475ebfa 100644 --- a/.github/workflows/pr_checks.yml +++ b/.github/workflows/pr_checks.yml @@ -28,34 +28,3 @@ jobs: with: package: cli-v3 secrets: inherit - - preview-release: - name: Preview Release - needs: [typecheck, units, e2e] - if: github.repository == 'triggerdotdev/trigger.dev' - runs-on: ubuntu-latest - steps: - - name: ⬇️ Checkout repo - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - - name: ⎔ Setup pnpm - uses: pnpm/action-setup@v4 - with: - version: 8.15.5 - - - name: ⎔ Setup node - uses: buildjet/setup-node@v4 - with: - node-version: 20.11.1 - cache: "pnpm" - - - name: 📥 Download deps - run: pnpm install --frozen-lockfile - - - name: 🏗️ Build - run: pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev" - - - name: ⚡ Publish preview release - run: npx pkg-pr-new publish --no-template $(ls -d ./packages/*)