Skip to content

Use redis worker for run heartbeats and alerts #1669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions .github/workflows/pr_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,3 @@ jobs:
with:
package: cli-v3
secrets: inherit

preview-release:
name: Preview Release
needs: [typecheck, units, e2e]
if: github.repository == 'triggerdotdev/trigger.dev'
runs-on: ubuntu-latest
steps:
- name: ⬇️ Checkout repo
uses: actions/checkout@v4
with:
fetch-depth: 0

- name: ⎔ Setup pnpm
uses: pnpm/action-setup@v4
with:
version: 8.15.5

- name: ⎔ Setup node
uses: buildjet/setup-node@v4
with:
node-version: 20.11.1
cache: "pnpm"

- name: 📥 Download deps
run: pnpm install --frozen-lockfile

- name: 🏗️ Build
run: pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"

- name: ⚡ Publish preview release
run: npx pkg-pr-new publish --no-template $(ls -d ./packages/*)
76 changes: 76 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,82 @@ const EnvironmentSchema = z.object({
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"),
BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"),

LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),

LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
LEGACY_RUN_ENGINE_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z
.string()
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),

COMMON_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),

COMMON_WORKER_REDIS_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_HOST),
COMMON_WORKER_REDIS_READER_HOST: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_READER_HOST),
COMMON_WORKER_REDIS_READER_PORT: z.coerce
.number()
.optional()
.transform(
(v) =>
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
),
COMMON_WORKER_REDIS_PORT: z.coerce
.number()
.optional()
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
COMMON_WORKER_REDIS_USERNAME: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_USERNAME),
COMMON_WORKER_REDIS_PASSWORD: z
.string()
.optional()
.transform((v) => v ?? process.env.REDIS_PASSWORD),
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
20 changes: 1 addition & 19 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
Expand Down Expand Up @@ -157,9 +155,6 @@ const workerCatalog = {
"v3.performTaskRunAlerts": z.object({
runId: z.string(),
}),
"v3.performTaskAttemptAlerts": z.object({
attemptId: z.string(),
}),
"v3.deliverAlert": z.object({
alertId: z.string(),
}),
Expand Down Expand Up @@ -610,15 +605,6 @@ function getWorkerQueue() {
return await service.call(payload.runId);
},
},
"v3.performTaskAttemptAlerts": {
priority: 0,
maxAttempts: 3,
handler: async (payload, job) => {
const service = new PerformTaskAttemptAlertsService();

return await service.call(payload.attemptId);
},
},
"v3.deliverAlert": {
priority: 0,
maxAttempts: 8,
Expand Down Expand Up @@ -658,11 +644,7 @@ function getWorkerQueue() {
"v3.requeueTaskRun": {
priority: 0,
maxAttempts: 3,
handler: async (payload, job) => {
const service = new RequeueTaskRunService();

await service.call(payload.runId);
},
handler: async (payload, job) => {}, // This is now handled by redisWorker
},
"v3.retryAttempt": {
priority: 0,
Expand Down
93 changes: 93 additions & 0 deletions apps/webapp/app/v3/commonWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "common:worker:",
host: env.COMMON_WORKER_REDIS_HOST,
port: env.COMMON_WORKER_REDIS_PORT,
username: env.COMMON_WORKER_REDIS_USERNAME,
password: env.COMMON_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.COMMON_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(`👨‍🏭 Initializing common worker at host ${env.COMMON_WORKER_REDIS_HOST}`);

const worker = new RedisWorker({
name: "common-worker",
redisOptions,
catalog: {
"v3.performTaskRunAlerts": {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.performDeploymentAlerts": {
schema: z.object({
deploymentId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
"v3.deliverAlert": {
schema: z.object({
alertId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
},
concurrency: {
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.COMMON_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
logger: new Logger("CommonWorker", "debug"),
jobs: {
"v3.deliverAlert": async ({ payload }) => {
const service = new DeliverAlertService();

return await service.call(payload.alertId);
},
"v3.performDeploymentAlerts": async ({ payload }) => {
const service = new PerformDeploymentAlertsService();

return await service.call(payload.deploymentId);
},
"v3.performTaskRunAlerts": async ({ payload }) => {
const service = new PerformTaskRunAlertsService();
return await service.call(payload.runId);
},
},
});

if (env.COMMON_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const commonWorker = singleton("commonWorker", initializeWorker);
66 changes: 66 additions & 0 deletions apps/webapp/app/v3/legacyRunEngineWorker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { Worker as RedisWorker } from "@internal/redis-worker";
import { Logger } from "@trigger.dev/core/logger";
import { z } from "zod";
import { env } from "~/env.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";

function initializeWorker() {
const redisOptions = {
keyPrefix: "legacy-run-engine:worker:",
host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST,
port: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT,
username: env.LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME,
password: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
};

logger.debug(
`👨‍🏭 Initializing legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}`
);

const worker = new RedisWorker({
name: "legacy-run-engine-worker",
redisOptions,
catalog: {
runHeartbeat: {
schema: z.object({
runId: z.string(),
}),
visibilityTimeoutMs: 60_000,
retry: {
maxAttempts: 3,
},
},
},
concurrency: {
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
limit: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT,
},
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
logger: new Logger("LegacyRunEngineWorker", "debug"),
jobs: {
runHeartbeat: async ({ payload }) => {
const service = new TaskRunHeartbeatFailedService();

await service.call(payload.runId);
},
Comment on lines +47 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for task failures.

The job handler should include proper error handling and logging.

       runHeartbeat: async ({ payload }) => {
+        try {
           const service = new TaskRunHeartbeatFailedService();
           await service.call(payload.runId);
+        } catch (error) {
+          logger.error("Failed to process run heartbeat", {
+            error,
+            runId: payload.runId,
+          });
+          throw error; // Re-throw to trigger retry
+        }
       },
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
runHeartbeat: async ({ payload }) => {
const service = new TaskRunHeartbeatFailedService();
await service.call(payload.runId);
},
runHeartbeat: async ({ payload }) => {
try {
const service = new TaskRunHeartbeatFailedService();
await service.call(payload.runId);
} catch (error) {
logger.error("Failed to process run heartbeat", {
error,
runId: payload.runId,
});
throw error; // Re-throw to trigger retry
}
},

},
});

if (env.LEGACY_RUN_ENGINE_WORKER_ENABLED === "true") {
logger.debug(
`👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}`
);

worker.start();
}

return worker;
}

export const legacyRunEngineWorker = singleton("legacyRunEngineWorker", initializeWorker);
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import {
MessageQueueSubscriber,
VisibilityTimeoutStrategy,
} from "./types";
import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server";
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";

const KEY_PREFIX = "marqs:";

Expand Down Expand Up @@ -1611,7 +1611,7 @@ function getMarQSClient() {
name: "marqs",
tracer: trace.getTracer("marqs"),
keysProducer,
visibilityTimeoutStrategy: new V3VisibilityTimeout(),
visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(),
queuePriorityStrategy: new FairDequeuingStrategy({
tracer: tracer,
redis,
Expand Down
24 changes: 20 additions & 4 deletions apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,28 @@
import { RequeueTaskRunService } from "../requeueTaskRun.server";
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server";
import { VisibilityTimeoutStrategy } from "./types";

export class V3VisibilityTimeout implements VisibilityTimeoutStrategy {
export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
}

async cancelHeartbeat(messageId: string): Promise<void> {
await RequeueTaskRunService.dequeue(messageId);
await TaskRunHeartbeatFailedService.dequeue(messageId);
}
}

export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy {
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
await legacyRunEngineWorker.enqueue({
id: `heartbeat:${messageId}`,
job: "runHeartbeat",
payload: { runId: messageId },
availableAt: new Date(Date.now() + timeoutInMs),
});
}

async cancelHeartbeat(messageId: string): Promise<void> {
await legacyRunEngineWorker.ack(`heartbeat:${messageId}`);
}
}
Loading
Loading