Skip to content

Various fixes for run engine v1 #1643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.ser
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
import * as semver from "semver";

const includeAttempts = {
attempts: {
orderBy: {
createdAt: "desc",
const FailedTaskRunRetryGetPayload = {
select: {
id: true,
attempts: {
orderBy: {
createdAt: "desc",
},
take: 1,
},
take: 1,
lockedById: true, // task
lockedToVersionId: true, // worker
},
lockedBy: true, // task
lockedToVersion: true, // worker
} satisfies Prisma.TaskRunInclude;
} as const;

type TaskRunWithAttempts = Prisma.TaskRunGetPayload<{
include: typeof includeAttempts;
}>;
type TaskRunWithAttempts = Prisma.TaskRunGetPayload<typeof FailedTaskRunRetryGetPayload>;

export class FailedTaskRunService extends BaseService {
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
Expand Down Expand Up @@ -92,7 +93,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
where: {
id: runId,
},
include: includeAttempts,
...FailedTaskRunRetryGetPayload,
});

if (!taskRun) {
Expand Down
89 changes: 60 additions & 29 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import {
MachinePreset,
ProdTaskRunExecution,
ProdTaskRunExecutionPayload,
QueueOptions,
TaskRunError,
TaskRunErrorCodes,
TaskRunExecution,
Expand All @@ -29,13 +28,13 @@ import {
BackgroundWorker,
BackgroundWorkerTask,
Prisma,
TaskQueue,
TaskRunStatus,
} from "@trigger.dev/database";
import { z } from "zod";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { singleton } from "~/utils/singleton";
Expand Down Expand Up @@ -67,7 +66,6 @@ import {
import { tracer } from "../tracer.server";
import { getMaxDuration } from "../utils/maxDuration";
import { MessagePayload } from "./types";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";

const WithTraceContext = z.object({
traceparent: z.string().optional(),
Expand Down Expand Up @@ -323,6 +321,14 @@ export class SharedQueueConsumer {
ROOT_CONTEXT
);

logger.debug("SharedQueueConsumer starting new trace", {
reasonStats: this._reasonStats,
actionStats: this._actionStats,
outcomeStats: this._outcomeStats,
iterationCount: this._iterationsCount,
consumerId: this._id,
});

// Get the span trace context
this._currentSpanContext = trace.setSpan(ROOT_CONTEXT, this._currentSpan);

Expand Down Expand Up @@ -351,6 +357,10 @@ export class SharedQueueConsumer {
try {
const result = await this.#doWorkInternal();

if (result.reason !== "no_message_dequeued") {
logger.debug("SharedQueueConsumer doWorkInternal result", { result });
}

this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1;
this._outcomeStats[result.outcome] = (this._outcomeStats[result.outcome] ?? 0) + 1;

Expand All @@ -371,6 +381,9 @@ export class SharedQueueConsumer {
if (result.error) {
span.recordException(result.error);
span.setStatus({ code: SpanStatusCode.ERROR });
this._currentSpan?.recordException(result.error);
this._currentSpan?.setStatus({ code: SpanStatusCode.ERROR });
this._endSpanInNextIteration = true;
}

if (typeof result.interval === "number") {
Expand Down Expand Up @@ -755,7 +768,7 @@ export class SharedQueueConsumer {
);

if (!queue) {
logger.debug("SharedQueueConsumer queue not found, so nacking message", {
logger.debug("SharedQueueConsumer queue not found, so acking message", {
queueMessage: message,
taskRunQueue: lockedTaskRun.queue,
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
Expand Down Expand Up @@ -876,33 +889,49 @@ export class SharedQueueConsumer {
machinePresetFromRun(lockedTaskRun) ??
machinePresetFromConfig(lockedTaskRun.lockedBy?.machineConfig ?? {});

await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
backgroundWorkerId: worker.friendlyId,
data: {
type: "SCHEDULE_ATTEMPT",
image: imageReference,
version: deployment.version,
machine,
nextAttemptNumber,
// identifiers
id: "placeholder", // TODO: Remove this completely in a future release
envId: lockedTaskRun.runtimeEnvironment.id,
envType: lockedTaskRun.runtimeEnvironment.type,
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
projectId: lockedTaskRun.runtimeEnvironment.projectId,
runId: lockedTaskRun.id,
},
return await this.#startActiveSpan("scheduleAttemptOnProvider", async (span) => {
span.setAttributes({
run_id: lockedTaskRun.id,
});
});

return {
action: "noop",
reason: "scheduled_attempt",
attrs: {
next_attempt_number: nextAttemptNumber,
},
};
if (await this._providerSender.validateCanSendMessage()) {
await this._providerSender.send("BACKGROUND_WORKER_MESSAGE", {
backgroundWorkerId: worker.friendlyId,
data: {
type: "SCHEDULE_ATTEMPT",
image: imageReference,
version: deployment.version,
machine,
nextAttemptNumber,
// identifiers
id: "placeholder", // TODO: Remove this completely in a future release
envId: lockedTaskRun.runtimeEnvironment.id,
envType: lockedTaskRun.runtimeEnvironment.type,
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
projectId: lockedTaskRun.runtimeEnvironment.projectId,
runId: lockedTaskRun.id,
},
});

return {
action: "noop",
reason: "scheduled_attempt",
attrs: {
next_attempt_number: nextAttemptNumber,
},
};
} else {
return {
action: "nack_and_do_more_work",
reason: "provider_not_connected",
attrs: {
run_id: lockedTaskRun.id,
},
interval: this._options.nextTickInterval,
retryInMs: 5_000,
};
}
});
}
} catch (e) {
// We now need to unlock the task run and delete the task run attempt
Expand All @@ -929,6 +958,8 @@ export class SharedQueueConsumer {
action: "nack_and_do_more_work",
reason: "failed_to_schedule_attempt",
error: e instanceof Error ? e : String(e),
interval: this._options.nextTickInterval,
retryInMs: 5_000,
};
}
}
Expand Down
25 changes: 23 additions & 2 deletions apps/webapp/app/v3/requeueTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class RequeueTaskRunService extends BaseService {
id: true,
friendlyId: true,
status: true,
lockedAt: true,
runtimeEnvironment: {
select: {
type: true,
Expand All @@ -42,9 +43,29 @@ export class RequeueTaskRunService extends BaseService {

switch (taskRun.status) {
case "PENDING": {
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
if (taskRun.lockedAt) {
logger.debug(
"[RequeueTaskRunService] Failing task run because the heartbeat failed and it's PENDING but locked",
{ taskRun }
);

const service = new FailedTaskRunService();

await service.call(taskRun.friendlyId, {
ok: false,
id: taskRun.friendlyId,
retry: undefined,
error: {
type: "INTERNAL_ERROR",
code: TaskRunErrorCodes.TASK_RUN_HEARTBEAT_TIMEOUT,
message: "Did not receive a heartbeat from the worker in time",
},
});
} else {
logger.debug("[RequeueTaskRunService] Nacking task run", { taskRun });

await marqs?.nackMessage(taskRun.id);
await marqs?.nackMessage(taskRun.id);
}

break;
}
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,12 @@ async function getAuthenticatedEnvironmentFromRun(
friendlyId: string,
prismaClient?: PrismaClientOrTransaction
) {
const taskRun = await (prismaClient ?? prisma).taskRun.findUnique({
const isFriendlyId = friendlyId.startsWith("run_");

const taskRun = await (prismaClient ?? prisma).taskRun.findFirst({
where: {
friendlyId,
id: !isFriendlyId ? friendlyId : undefined,
friendlyId: isFriendlyId ? friendlyId : undefined,
},
include: {
runtimeEnvironment: {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/resumeTaskDependency.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { logger } from "~/services/logger.server";

export class ResumeTaskDependencyService extends BaseService {
public async call(dependencyId: string, sourceTaskAttemptId: string) {
const dependency = await this._prisma.taskRunDependency.findUnique({
const dependency = await this._prisma.taskRunDependency.findFirst({
where: { id: dependencyId },
include: {
taskRun: {
Expand Down
8 changes: 8 additions & 0 deletions apps/webapp/app/v3/sharedSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ export class SharedSocketConnection {
}
});
},
canSendMessage() {
// Return true if there is at least 1 connected socket on the namespace
if (opts.namespace.sockets.size === 0) {
return false;
}

return Array.from(opts.namespace.sockets.values()).some((socket) => socket.connected);
},
});

logger.debug("Starting SharedQueueConsumer pool", {
Expand Down
43 changes: 43 additions & 0 deletions packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,17 @@ export class Logger {
// Get the current context from trace if it exists
const currentSpan = trace.getSpan(context.active());

const structuredError = extractStructuredErrorFromArgs(...args);
const structuredMessage = extractStructuredMessageFromArgs(...args);

const structuredLog = {
...structureArgs(safeJsonClone(args) as Record<string, unknown>[], this.#filteredKeys),
...this.#additionalFields(),
...(structuredError ? { error: structuredError } : {}),
timestamp: new Date(),
name: this.#name,
message,
...(structuredMessage ? { $message: structuredMessage } : {}),
level,
traceId:
currentSpan && currentSpan.isRecording() ? currentSpan?.spanContext().traceId : undefined,
Expand All @@ -118,6 +123,44 @@ export class Logger {
}
}

// Detect if args is an error object
// Or if args contains an error object at the "error" key
// In both cases, return the error object as a structured error
function extractStructuredErrorFromArgs(...args: Array<Record<string, unknown> | undefined>) {
const error = args.find((arg) => arg instanceof Error) as Error | undefined;

if (error) {
return {
message: error.message,
stack: error.stack,
name: error.name,
};
}

const structuredError = args.find((arg) => arg?.error);

if (structuredError && structuredError.error instanceof Error) {
return {
message: structuredError.error.message,
stack: structuredError.error.stack,
name: structuredError.error.name,
};
}

return;
}

function extractStructuredMessageFromArgs(...args: Array<Record<string, unknown> | undefined>) {
// Check to see if there is a `message` key in the args, and if so, return it
const structuredMessage = args.find((arg) => arg?.message);

if (structuredMessage) {
return structuredMessage.message;
}

return;
}

function createReplacer(replacer?: (key: string, value: unknown) => unknown) {
return (key: string, value: unknown) => {
if (typeof value === "bigint") {
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/v3/zodMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,25 @@ type ZodMessageSenderCallback<TMessageCatalog extends ZodMessageCatalogSchema> =
export type ZodMessageSenderOptions<TMessageCatalog extends ZodMessageCatalogSchema> = {
schema: TMessageCatalog;
sender: ZodMessageSenderCallback<TMessageCatalog>;
canSendMessage?: () => Promise<boolean> | boolean;
};

export class ZodMessageSender<TMessageCatalog extends ZodMessageCatalogSchema> {
#schema: TMessageCatalog;
#sender: ZodMessageSenderCallback<TMessageCatalog>;
#canSendMessage?: ZodMessageSenderOptions<TMessageCatalog>["canSendMessage"];

constructor(options: ZodMessageSenderOptions<TMessageCatalog>) {
this.#schema = options.schema;
this.#sender = options.sender;
this.#canSendMessage = options.canSendMessage;
}

public async validateCanSendMessage(): Promise<boolean> {
if (!this.#canSendMessage) {
return true;
}
return await this.#canSendMessage();
}

public async send<K extends keyof TMessageCatalog>(
Expand Down
Loading