Skip to content

re2: dev runs work without worker groups, fixed some type issues #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/coordinator/src/checkpointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export class Checkpointer {
return result.checkpoint;
} finally {
if (opts.shouldHeartbeat) {
// @ts-ignore - Some kind of node incompatible type issue
clearInterval(interval);
}
removeCurrentAbortController();
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/utils/taskEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
TaskEventStyle,
unflattenAttributes,
} from "@trigger.dev/core/v3";
import { Prisma, TaskEvent } from "@trigger.dev/database";
import { Prisma, TaskEvent, TaskEventKind } from "@trigger.dev/database";
import { createTreeFromFlatItems, flattenTree } from "~/components/primitives/TreeView/TreeView";
import type {
PreparedEvent,
Expand Down Expand Up @@ -76,7 +76,7 @@ export function prepareTrace(events: TaskEvent[]): TraceSummary | undefined {
level: event.level,
events: event.events,
environmentType: event.environmentType,
isDebug: event.isDebug,
isDebug: event.kind === TaskEventKind.LOG,
},
} satisfies SpanSummary;

Expand Down
37 changes: 7 additions & 30 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {
omit,
unflattenAttributes,
} from "@trigger.dev/core/v3";
import { Prisma, TaskEvent, TaskEventStatus, type TaskEventKind } from "@trigger.dev/database";
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database";
import { createHash } from "node:crypto";
import { EventEmitter } from "node:stream";
import { Gauge } from "prom-client";
Expand Down Expand Up @@ -126,10 +126,10 @@ export type QueriedEvent = Prisma.TaskEventGetPayload<{
isError: true;
isPartial: true;
isCancelled: true;
isDebug: true;
level: true;
events: true;
environmentType: true;
kind: true;
};
}>;

Expand Down Expand Up @@ -186,26 +186,6 @@ export type UpdateEventOptions = {
events?: SpanEvents;
};

type TaskEventSummary = Pick<
TaskEvent,
| "id"
| "spanId"
| "parentId"
| "runId"
| "idempotencyKey"
| "message"
| "style"
| "startTime"
| "duration"
| "isError"
| "isPartial"
| "isCancelled"
| "level"
| "events"
| "environmentType"
| "isDebug"
>;

export class EventRepository {
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
private _randomIdGenerator = new RandomIdGenerator();
Expand Down Expand Up @@ -512,7 +492,7 @@ export class EventRepository {
isError: event.isError,
isPartial: ancestorCancelled ? false : event.isPartial,
isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled,
isDebug: event.isDebug,
isDebug: event.kind === TaskEventKind.LOG,
startTime: getDateFromNanoseconds(event.startTime),
level: event.level,
events: event.events,
Expand Down Expand Up @@ -569,7 +549,7 @@ export class EventRepository {
isError: true,
isPartial: true,
isCancelled: true,
isDebug: true,
kind: true,
level: true,
events: true,
environmentType: true,
Expand Down Expand Up @@ -865,10 +845,8 @@ export class EventRepository {
...options.attributes.metadata,
};

const isDebug = options.attributes.isDebug;

const style = {
[SemanticInternalAttributes.STYLE_ICON]: isDebug ? "warn" : "play",
[SemanticInternalAttributes.STYLE_ICON]: options.attributes.isDebug ? "warn" : "play",
};

if (!options.attributes.runId) {
Expand All @@ -883,12 +861,11 @@ export class EventRepository {
message: message,
serviceName: "api server",
serviceNamespace: "trigger.dev",
level: isDebug ? "WARN" : "TRACE",
kind: options.kind,
level: options.attributes.isDebug ? "WARN" : "TRACE",
kind: options.attributes.isDebug ? TaskEventKind.LOG : options.kind,
status: "OK",
startTime,
isPartial: false,
isDebug,
duration, // convert to nanoseconds
environmentId: options.environment.id,
environmentType: options.environment.type,
Expand Down
39 changes: 23 additions & 16 deletions apps/webapp/app/v3/services/triggerTaskV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,21 +312,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
event.setAttribute("runId", runFriendlyId);
span.setAttribute("runId", runFriendlyId);

const workerGroupService = new WorkerGroupService({
prisma: this._prisma,
engine: this._engine,
});
const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
});

if (!workerGroup) {
logger.error("Default worker group not found", {
projectId: environment.projectId,
});

return;
}
const masterQueue = await this.#getMasterQueueForEnvironment(environment);

const taskRun = await this._engine.trigger(
{
Expand All @@ -351,7 +337,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
concurrencyKey: body.options?.concurrencyKey,
queueName,
queue: body.options?.queue,
masterQueue: workerGroup.masterQueue,
masterQueue: masterQueue,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
Expand Down Expand Up @@ -441,6 +427,27 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
});
}

async #getMasterQueueForEnvironment(environment: AuthenticatedEnvironment) {
if (environment.type === "DEVELOPMENT") {
return;
}

const workerGroupService = new WorkerGroupService({
prisma: this._prisma,
engine: this._engine,
});

const workerGroup = await workerGroupService.getDefaultWorkerGroupForProject({
projectId: environment.projectId,
});

if (!workerGroup) {
throw new ServiceValidationError("No worker group found");
}

return workerGroup.masterQueue;
}

async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
if (queueName) {
return queueName;
Expand Down
6 changes: 3 additions & 3 deletions apps/webapp/app/v3/taskEventStore.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export type TraceEvent = Pick<
| "level"
| "events"
| "environmentType"
| "isDebug"
| "kind"
>;

export type TaskEventStoreTable = "taskEvent" | "taskEventPartitioned";
Expand Down Expand Up @@ -138,7 +138,7 @@ export class TaskEventStore {
level,
events,
"environmentType",
"isDebug"
"kind"
FROM "TaskEventPartitioned"
WHERE
"traceId" = ${traceId}
Expand Down Expand Up @@ -168,7 +168,7 @@ export class TaskEventStore {
level,
events,
"environmentType",
"isDebug"
"kind"
FROM "TaskEvent"
WHERE "traceId" = ${traceId}
ORDER BY "startTime" ASC
Expand Down
1 change: 1 addition & 0 deletions internal-packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -2381,6 +2381,7 @@ model TaskEvent {
isError Boolean @default(false)
isPartial Boolean @default(false)
isCancelled Boolean @default(false)
/// deprecated: don't use this, moving this to properties, this now uses TaskEventKind.LOG
isDebug Boolean @default(false)

serviceName String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ describe("RunEngine batchTriggerAndWait", () => {
batchId: batch.id,
environmentId: authenticatedEnvironment.id,
projectId: authenticatedEnvironment.projectId,
organizationId: authenticatedEnvironment.organizationId,
});

const afterBlockedByBatch = await engine.getRunExecutionData({ runId: parentRun.id });
Expand Down
4 changes: 2 additions & 2 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ export type TriggerParams = {
sdkVersion?: string;
cliVersion?: string;
concurrencyKey?: string;
masterQueue: string;
masterQueue?: string;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify that all consumers of masterQueue handle undefined values

Making masterQueue optional could cause runtime errors if any code assumes this field always has a value. Ensure all code that consumes this field includes appropriate null/undefined checks.


🏁 Script executed:

#!/bin/bash
# Description: Find all uses of masterQueue from TriggerParams to verify they handle undefined values

# Look for usages of TriggerParams.masterQueue
rg -A 3 -B 3 "\bmasterQueue\b" --glob "*.ts" | grep -v "masterQueue\?:"

Length of output: 63436


Action: Audit and add safeguards for masterQueue usage

The field masterQueue defined as optional in
internal-packages/run-engine/src/engine/types.ts
is referenced in multiple locations without explicit checks for undefined. For example:

  • In internal-packages/run-engine/src/run-queue/keyProducer.ts, a template literal (${this._prefix}*${masterQueue}) concatenates its value directly. If masterQueue is undefined, it will produce an unexpected string (e.g., "prefix*undefined").
  • In internal-packages/run-engine/src/engine/index.ts, masterQueue is used to build arrays and is passed to methods without validating its presence.
  • Similar patterns appear in other consumers (e.g., files in apps/webapp/app/v3/services/triggerTaskV2.server.ts and worker group token service), where no explicit undefined guard is evident.

Although the tests haven’t reported runtime errors so far, these unchecked usages could lead to subtle bugs when masterQueue is not supplied. Please review each consumer:

• Ensure that methods consuming masterQueue either perform an explicit undefined check or utilize a sensible default value.
• Update the affected code (e.g., in keyProducer and engine/index) to guard against undefined values.

queueName: string;
queue?: QueueOptions;
isTest: boolean;
delayUntil?: Date;
queuedAt?: Date;
maxAttempts?: number;
taskEventStore: string;
taskEventStore?: string;
priorityMs?: number;
ttl?: string;
tags: { id: string; name: string }[];
Expand Down
4 changes: 4 additions & 0 deletions internal-packages/testcontainers/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ async function verifyRedisConnection(container: StartedRedisContainer) {
},
});

redis.on("error", (error) => {
// swallow the error
});

try {
await redis.ping();
} finally {
Expand Down
Loading