Skip to content

Commit 1d7834c

Browse files
committed
Improve the MarQS priority system by moving future messages into the LRE worker and using a priority timestamp offset to define priority in messages
1 parent e19bf2a commit 1d7834c

13 files changed

+197
-776
lines changed

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
88
import { completeBatchTaskRunItemV3 } from "./services/batchTriggerV3.server";
99
import { prisma } from "~/db.server";
10+
import { marqs } from "./marqs/index.server";
1011

1112
function initializeWorker() {
1213
const redisOptions = {
@@ -49,6 +50,15 @@ function initializeWorker() {
4950
maxAttempts: 10,
5051
},
5152
},
53+
scheduleRequeueMessage: {
54+
schema: z.object({
55+
messageId: z.string(),
56+
}),
57+
visibilityTimeoutMs: 60_000,
58+
retry: {
59+
maxAttempts: 5,
60+
},
61+
},
5262
},
5363
concurrency: {
5464
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
@@ -74,6 +84,9 @@ function initializeWorker() {
7484
attempt
7585
);
7686
},
87+
scheduleRequeueMessage: async ({ payload }) => {
88+
await marqs.requeueMessageById(payload.messageId);
89+
},
7790
},
7891
});
7992

apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts

Lines changed: 0 additions & 95 deletions
This file was deleted.

0 commit comments

Comments
 (0)