Skip to content

Commit 1ff7b86

Browse files
authored
Add max queue depth limits (#1376)
* Add runs to an env queue, as well as the actual queue * Add queue size limit guard on triggering tasks
1 parent c531a9d commit 1ff7b86

File tree

9 files changed

+137
-4
lines changed

9 files changed

+137
-4
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ const EnvironmentSchema = z.object({
213213
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
214214
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
215215
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB
216+
217+
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
218+
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
216219
});
217220

218221
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ export class MarQS {
139139
return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey));
140140
}
141141

142+
public async lengthOfEnvQueue(env: AuthenticatedEnvironment) {
143+
return this.redis.zcard(this.keys.envQueueKey(env));
144+
}
145+
142146
public async oldestMessageInQueue(
143147
env: AuthenticatedEnvironment,
144148
queue: string,
@@ -1074,6 +1078,7 @@ export class MarQS {
10741078
concurrencyKey,
10751079
envConcurrencyKey,
10761080
orgConcurrencyKey,
1081+
this.keys.envQueueKeyFromQueue(message.queue),
10771082
message.queue,
10781083
message.messageId,
10791084
JSON.stringify(message),
@@ -1111,6 +1116,7 @@ export class MarQS {
11111116
currentConcurrencyKey,
11121117
envCurrentConcurrencyKey,
11131118
orgCurrentConcurrencyKey,
1119+
this.keys.envQueueKeyFromQueue(messageQueue),
11141120
messageQueue,
11151121
String(Date.now()),
11161122
String(this.options.defaultEnvConcurrency),
@@ -1187,6 +1193,7 @@ export class MarQS {
11871193
concurrencyKey,
11881194
envConcurrencyKey,
11891195
orgConcurrencyKey,
1196+
this.keys.envQueueKeyFromQueue(messageQueue),
11901197
messageId,
11911198
messageQueue
11921199
);
@@ -1234,6 +1241,7 @@ export class MarQS {
12341241
envConcurrencyKey,
12351242
orgConcurrencyKey,
12361243
visibilityQueue,
1244+
this.keys.envQueueKeyFromQueue(messageQueue),
12371245
messageQueue,
12381246
messageId,
12391247
String(Date.now()),
@@ -1347,14 +1355,15 @@ export class MarQS {
13471355

13481356
#registerCommands() {
13491357
this.redis.defineCommand("enqueueMessage", {
1350-
numberOfKeys: 6,
1358+
numberOfKeys: 7,
13511359
lua: `
13521360
local queue = KEYS[1]
13531361
local parentQueue = KEYS[2]
13541362
local messageKey = KEYS[3]
13551363
local concurrencyKey = KEYS[4]
13561364
local envCurrentConcurrencyKey = KEYS[5]
13571365
local orgCurrentConcurrencyKey = KEYS[6]
1366+
local envQueue = KEYS[7]
13581367
13591368
local queueName = ARGV[1]
13601369
local messageId = ARGV[2]
@@ -1367,6 +1376,9 @@ redis.call('SET', messageKey, messageData)
13671376
-- Add the message to the queue
13681377
redis.call('ZADD', queue, messageScore, messageId)
13691378
1379+
-- Add the message to the env queue
1380+
redis.call('ZADD', envQueue, messageScore, messageId)
1381+
13701382
-- Rebalance the parent queue
13711383
local earliestMessage = redis.call('ZRANGE', queue, 0, 0, 'WITHSCORES')
13721384
if #earliestMessage == 0 then
@@ -1383,7 +1395,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
13831395
});
13841396

13851397
this.redis.defineCommand("dequeueMessage", {
1386-
numberOfKeys: 8,
1398+
numberOfKeys: 9,
13871399
lua: `
13881400
-- Keys: childQueue, parentQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
13891401
local childQueue = KEYS[1]
@@ -1394,6 +1406,7 @@ local orgConcurrencyLimitKey = KEYS[5]
13941406
local currentConcurrencyKey = KEYS[6]
13951407
local envCurrentConcurrencyKey = KEYS[7]
13961408
local orgCurrentConcurrencyKey = KEYS[8]
1409+
local envQueueKey = KEYS[9]
13971410
13981411
-- Args: childQueueName, currentTime, defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit
13991412
local childQueueName = ARGV[1]
@@ -1438,6 +1451,7 @@ local messageScore = tonumber(messages[2])
14381451
14391452
-- Move message to timeout queue and update concurrency
14401453
redis.call('ZREM', childQueue, messageId)
1454+
redis.call('ZREM', envQueueKey, messageId)
14411455
redis.call('SADD', currentConcurrencyKey, messageId)
14421456
redis.call('SADD', envCurrentConcurrencyKey, messageId)
14431457
redis.call('SADD', orgCurrentConcurrencyKey, messageId)
@@ -1474,7 +1488,7 @@ redis.call('SET', messageKey, messageData, 'GET')
14741488
});
14751489

14761490
this.redis.defineCommand("acknowledgeMessage", {
1477-
numberOfKeys: 7,
1491+
numberOfKeys: 8,
14781492
lua: `
14791493
-- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
14801494
local parentQueue = KEYS[1]
@@ -1484,6 +1498,7 @@ local visibilityQueue = KEYS[4]
14841498
local concurrencyKey = KEYS[5]
14851499
local envCurrentConcurrencyKey = KEYS[6]
14861500
local orgCurrentConcurrencyKey = KEYS[7]
1501+
local envQueueKey = KEYS[8]
14871502
14881503
-- Args: messageId, messageQueueName
14891504
local messageId = ARGV[1]
@@ -1495,6 +1510,9 @@ redis.call('DEL', messageKey)
14951510
-- Remove the message from the queue
14961511
redis.call('ZREM', messageQueue, messageId)
14971512
1513+
-- Remove the message from the env queue
1514+
redis.call('ZREM', envQueueKey, messageId)
1515+
14981516
-- Rebalance the parent queue
14991517
local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES')
15001518
if #earliestMessage == 0 then
@@ -1514,7 +1532,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
15141532
});
15151533

15161534
this.redis.defineCommand("nackMessage", {
1517-
numberOfKeys: 7,
1535+
numberOfKeys: 8,
15181536
lua: `
15191537
-- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId
15201538
local messageKey = KEYS[1]
@@ -1524,6 +1542,7 @@ local concurrencyKey = KEYS[4]
15241542
local envConcurrencyKey = KEYS[5]
15251543
local orgConcurrencyKey = KEYS[6]
15261544
local visibilityQueue = KEYS[7]
1545+
local envQueueKey = KEYS[8]
15271546
15281547
-- Args: childQueueName, messageId, currentTime, messageScore
15291548
local childQueueName = ARGV[1]
@@ -1547,6 +1566,9 @@ end
15471566
-- Enqueue the message into the queue
15481567
redis.call('ZADD', childQueueKey, messageScore, messageId)
15491568
1569+
-- Enqueue the message into the env queue
1570+
redis.call('ZADD', envQueueKey, messageScore, messageId)
1571+
15501572
-- Rebalance the parent queue
15511573
local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES')
15521574
if #earliestMessage == 0 then
@@ -1729,6 +1751,7 @@ declare module "ioredis" {
17291751
concurrencyKey: string,
17301752
envConcurrencyKey: string,
17311753
orgConcurrencyKey: string,
1754+
envQueue: string,
17321755
queueName: string,
17331756
messageId: string,
17341757
messageData: string,
@@ -1745,6 +1768,7 @@ declare module "ioredis" {
17451768
currentConcurrencyKey: string,
17461769
envCurrentConcurrencyKey: string,
17471770
orgCurrentConcurrencyKey: string,
1771+
envQueueKey: string,
17481772
childQueueName: string,
17491773
currentTime: string,
17501774
defaultEnvConcurrencyLimit: string,
@@ -1766,6 +1790,7 @@ declare module "ioredis" {
17661790
concurrencyKey: string,
17671791
envConcurrencyKey: string,
17681792
orgConcurrencyKey: string,
1793+
envQueueKey: string,
17691794
messageId: string,
17701795
messageQueueName: string,
17711796
callback?: Callback<void>
@@ -1779,6 +1804,7 @@ declare module "ioredis" {
17791804
envConcurrencyKey: string,
17801805
orgConcurrencyKey: string,
17811806
visibilityQueue: string,
1807+
envQueueKey: string,
17821808
childQueueName: string,
17831809
messageId: string,
17841810
currentTime: string,

apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,16 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
128128
return [this.envKeySection(env.id), constants.CURRENT_CONCURRENCY_PART].join(":");
129129
}
130130

131+
envQueueKeyFromQueue(queue: string) {
132+
const envId = this.normalizeQueue(queue).split(":")[3];
133+
134+
return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`;
135+
}
136+
137+
envQueueKey(env: AuthenticatedEnvironment): string {
138+
return [constants.ENV_PART, this.shortId(env.id), constants.QUEUE_PART].join(":");
139+
}
140+
131141
messageKey(messageId: string) {
132142
return `${constants.MESSAGE_PART}:${messageId}`;
133143
}

apps/webapp/app/v3/marqs/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export interface MarQSKeyProducer {
2626
envConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
2727
orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
2828
queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string;
29+
envQueueKey(env: AuthenticatedEnvironment): string;
2930
envSharedQueueKey(env: AuthenticatedEnvironment): string;
3031
sharedQueueKey(): string;
3132
sharedQueueScanPattern(): string;
@@ -44,6 +45,7 @@ export interface MarQSKeyProducer {
4445
envCurrentConcurrencyKeyFromQueue(queue: string): string;
4546
orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
4647
envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
48+
envQueueKeyFromQueue(queue: string): string;
4749
messageKey(messageId: string): string;
4850
stripKeyPrefix(key: string): string;
4951
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2+
import { env } from "~/env.server";
3+
import { MarQS } from "./marqs/index.server";
4+
5+
export type QueueSizeGuardResult = {
6+
isWithinLimits: boolean;
7+
maximumSize?: number;
8+
queueSize?: number;
9+
};
10+
11+
export async function guardQueueSizeLimitsForEnv(
12+
environment: AuthenticatedEnvironment,
13+
marqs?: MarQS
14+
): Promise<QueueSizeGuardResult> {
15+
const maximumSize = getMaximumSizeForEnvironment(environment);
16+
17+
if (typeof maximumSize === "undefined") {
18+
return { isWithinLimits: true };
19+
}
20+
21+
if (!marqs) {
22+
return { isWithinLimits: true, maximumSize };
23+
}
24+
25+
const queueSize = await marqs.lengthOfEnvQueue(environment);
26+
27+
return {
28+
isWithinLimits: queueSize < maximumSize,
29+
maximumSize,
30+
queueSize,
31+
};
32+
}
33+
34+
function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined {
35+
if (environment.type === "DEVELOPMENT") {
36+
return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE;
37+
} else {
38+
return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE;
39+
}
40+
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2222
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2323
import { handleMetadataPacket } from "~/utils/packets";
2424
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
25+
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2526

2627
export type TriggerTaskServiceOptions = {
2728
idempotencyKey?: string;
@@ -82,6 +83,24 @@ export class TriggerTaskService extends BaseService {
8283
}
8384
}
8485

86+
const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs);
87+
88+
logger.debug("Queue size guard result", {
89+
queueSizeGuard,
90+
environment: {
91+
id: environment.id,
92+
type: environment.type,
93+
organization: environment.organization,
94+
project: environment.project,
95+
},
96+
});
97+
98+
if (!queueSizeGuard.isWithinLimits) {
99+
throw new ServiceValidationError(
100+
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
101+
);
102+
}
103+
85104
if (
86105
body.options?.tags &&
87106
typeof body.options.tags !== "string" &&
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- AlterTable
2+
ALTER TABLE "Organization" ADD COLUMN "maximumDeployedQueueSize" INTEGER,
3+
ADD COLUMN "maximumDevQueueSize" INTEGER;

packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ model Organization {
114114
/// This is deprecated and will be removed in the future
115115
maximumSchedulesLimit Int @default(5)
116116
117+
maximumDevQueueSize Int?
118+
maximumDeployedQueueSize Int?
119+
117120
createdAt DateTime @default(now())
118121
updatedAt DateTime @updatedAt
119122
deletedAt DateTime?

references/v3-catalog/src/trigger/simple.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import "server-only";
22
import { logger, SubtaskUnwrapError, task, tasks, wait } from "@trigger.dev/sdk/v3";
33
import { traceAsync } from "@/telemetry.js";
44
import { HeaderGenerator } from "header-generator";
5+
import { setTimeout as setTimeoutP } from "node:timers/promises";
56

67
let headerGenerator = new HeaderGenerator({
78
browsers: [{ name: "firefox", minVersion: 90 }, { name: "chrome", minVersion: 110 }, "safari"],
@@ -208,3 +209,29 @@ export const childTask = task({
208209
};
209210
},
210211
});
212+
213+
export const retryTask = task({
214+
id: "retry-task",
215+
run: async (payload: any) => {
216+
throw new Error("This task will always fail");
217+
},
218+
});
219+
220+
export const maximumQueueDepthParent = task({
221+
id: "maximum-queue-depth-parent",
222+
run: async (payload: any) => {
223+
await maximumQueueDepthChild.trigger({});
224+
await maximumQueueDepthChild.trigger({});
225+
await maximumQueueDepthChild.trigger({});
226+
},
227+
});
228+
229+
export const maximumQueueDepthChild = task({
230+
id: "maximum-queue-depth-child",
231+
queue: {
232+
concurrencyLimit: 1,
233+
},
234+
run: async (payload: any) => {
235+
await setTimeoutP(10_000);
236+
},
237+
});

0 commit comments

Comments
 (0)