Skip to content

Engine v1 improvements #1627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ const EnvironmentSchema = z.object({
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS: z.coerce.number().int().default(1000),
SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE: z.coerce.number().int().default(25),

// Development OTEL environment variables
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
Expand Down Expand Up @@ -219,6 +221,10 @@ const EnvironmentSchema = z.object({
.number()
.int()
.default(60 * 1000 * 15),
MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12),
MARQS_MAXIMUM_NACK_COUNT: z.coerce.number().int().default(64),

PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),

VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),
Expand Down
63 changes: 63 additions & 0 deletions apps/webapp/app/models/taskQueue.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { QueueOptions } from "@trigger.dev/core/v3/schemas";
import { TaskQueue } from "@trigger.dev/database";
import { prisma } from "~/db.server";

export async function findQueueInEnvironment(
queueName: string,
environmentId: string,
backgroundWorkerTaskId?: string,
backgroundTask?: { queueConfig?: unknown }
): Promise<TaskQueue | undefined> {
const sanitizedQueueName = sanitizeQueueName(queueName);

const queue = await prisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: environmentId,
name: sanitizedQueueName,
},
});

if (queue) {
return queue;
}

const task = backgroundTask
? backgroundTask
: backgroundWorkerTaskId
? await prisma.backgroundWorkerTask.findFirst({
where: {
id: backgroundWorkerTaskId,
},
})
: undefined;

if (!task) {
return;
}

const queueConfig = QueueOptions.safeParse(task.queueConfig);

if (queueConfig.success) {
const taskQueueName = queueConfig.data.name
? sanitizeQueueName(queueConfig.data.name)
: undefined;

if (taskQueueName && taskQueueName !== sanitizedQueueName) {
const queue = await prisma.taskQueue.findFirst({
where: {
runtimeEnvironmentId: environmentId,
name: taskQueueName,
},
});

if (queue) {
return queue;
}
}
}
}

// Only allow alphanumeric characters, underscores, hyphens, and slashes (and only the first 128 characters)
export function sanitizeQueueName(queueName: string) {
return queueName.replace(/[^a-zA-Z0-9_\-\/]/g, "").substring(0, 128);
}
31 changes: 0 additions & 31 deletions apps/webapp/app/routes/admin.api.v1.marqs.ts

This file was deleted.

3 changes: 2 additions & 1 deletion apps/webapp/app/services/apiAuth.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
isPersonalAccessToken,
} from "./personalAccessToken.server";
import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server";
import { RuntimeEnvironmentForEnvRepo } from "~/v3/environmentVariables/environmentVariablesRepository.server";

const ClaimsSchema = z.object({
scopes: z.array(z.string()).optional(),
Expand Down Expand Up @@ -410,7 +411,7 @@ const JWT_ALGORITHM = "HS256";
const DEFAULT_JWT_EXPIRATION_IN_MS = 1000 * 60 * 60; // 1 hour

export async function generateJWTTokenForEnvironment(
environment: RuntimeEnvironment,
environment: RuntimeEnvironmentForEnvRepo,
payload: Record<string, string>
) {
const jwt = await new SignJWT({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,26 @@ export class EnvironmentVariablesRepository implements Repository {
}
}

export const RuntimeEnvironmentForEnvRepoPayload = {
select: {
id: true,
slug: true,
type: true,
projectId: true,
apiKey: true,
organizationId: true,
},
} as const;

export type RuntimeEnvironmentForEnvRepo = Prisma.RuntimeEnvironmentGetPayload<
typeof RuntimeEnvironmentForEnvRepoPayload
>;

export const environmentVariablesRepository = new EnvironmentVariablesRepository();

export async function resolveVariablesForEnvironment(runtimeEnvironment: RuntimeEnvironment) {
export async function resolveVariablesForEnvironment(
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
) {
const projectSecrets = await environmentVariablesRepository.getEnvironmentVariables(
runtimeEnvironment.projectId,
runtimeEnvironment.id
Expand All @@ -672,7 +689,9 @@ export async function resolveVariablesForEnvironment(runtimeEnvironment: Runtime
return [...overridableTriggerVariables, ...projectSecrets, ...builtInVariables];
}

async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveOverridableTriggerVariables(
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
) {
let result: Array<EnvironmentVariable> = [
{
key: "TRIGGER_REALTIME_STREAM_VERSION",
Expand All @@ -683,7 +702,7 @@ async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnv
return result;
}

async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
let result: Array<EnvironmentVariable> = [
{
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
Expand Down Expand Up @@ -745,7 +764,7 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
return [...result, ...commonVariables];
}

async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironment) {
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
let result: Array<EnvironmentVariable> = [
{
key: "TRIGGER_SECRET_KEY",
Expand Down Expand Up @@ -838,7 +857,7 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
}

async function resolveCommonBuiltInVariables(
runtimeEnvironment: RuntimeEnvironment
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
): Promise<Array<EnvironmentVariable>> {
return [];
}
25 changes: 10 additions & 15 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@ import { prisma } from "~/db.server";
import { createNewSession, disconnectSession } from "~/models/runtimeEnvironment.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server";
import { marqs } from "~/v3/marqs/index.server";
import { resolveVariablesForEnvironment } from "../environmentVariables/environmentVariablesRepository.server";
import { FailedTaskRunService } from "../failedTaskRun.server";
import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server";
import { CompleteAttemptService } from "../services/completeAttempt.server";
import {
SEMINTATTRS_FORCE_RECORDING,
attributesFromAuthenticatedEnv,
tracer,
} from "../tracer.server";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
import { getMaxDuration } from "../utils/maxDuration";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";

const MessageBody = z.discriminatedUnion("type", [
z.object({
Expand Down Expand Up @@ -436,14 +433,12 @@ export class DevQueueConsumer {
return;
}

const queue = await prisma.taskQueue.findUnique({
where: {
runtimeEnvironmentId_name: {
runtimeEnvironmentId: this.env.id,
name: sanitizeQueueName(lockedTaskRun.queue),
},
},
});
const queue = await findQueueInEnvironment(
lockedTaskRun.queue,
this.env.id,
backgroundTask.id,
backgroundTask
);

if (!queue) {
logger.debug("[DevQueueConsumer] Failed to find queue", {
Expand Down
Loading
Loading