Skip to content

Commit ea8e9dc

Browse files
committed
centralize queue timestamp logic in EnqueueSystem, adding queueTimestamp support and propagation
1 parent 68f8aba commit ea8e9dc

File tree

10 files changed

+214
-113
lines changed

10 files changed

+214
-113
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,10 @@ export class RunEngineTriggerTaskService extends WithRunEngine {
371371
machine: body.options?.machine,
372372
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
373373
releaseConcurrency: body.options?.releaseConcurrency,
374+
queueTimestamp:
375+
parentRun && body.options?.resumeParentOnCompletion
376+
? parentRun.queueTimestamp ?? undefined
377+
: undefined,
374378
},
375379
this._prisma
376380
);

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ export class RunEngine {
334334
maxAttempts,
335335
taskEventStore,
336336
priorityMs,
337+
queueTimestamp,
337338
ttl,
338339
tags,
339340
parentTaskRunId,
@@ -414,6 +415,7 @@ export class RunEngine {
414415
maxAttempts,
415416
taskEventStore,
416417
priorityMs,
418+
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
417419
ttl,
418420
tags:
419421
tags.length === 0
@@ -520,7 +522,6 @@ export class RunEngine {
520522
await this.enqueueSystem.enqueueRun({
521523
run: taskRun,
522524
env: environment,
523-
timestamp: Date.now() - taskRun.priorityMs,
524525
workerId,
525526
runnerId,
526527
tx: prisma,

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ export class CheckpointSystem {
161161
const newSnapshot = await this.enqueueSystem.enqueueRun({
162162
run,
163163
env: run.runtimeEnvironment,
164-
timestamp: run.createdAt.getTime() - run.priorityMs,
165164
snapshot: {
166165
status: "QUEUED",
167166
description:

internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ export class DelayedRunSystem {
9898
await this.enqueueSystem.enqueueRun({
9999
run,
100100
env: run.runtimeEnvironment,
101-
timestamp: run.createdAt.getTime() - run.priorityMs,
102101
batchId: run.batchId ?? undefined,
103102
});
104103

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ export class EnqueueSystem {
2525
public async enqueueRun({
2626
run,
2727
env,
28-
timestamp,
2928
tx,
3029
snapshot,
3130
previousSnapshotId,
@@ -37,7 +36,6 @@ export class EnqueueSystem {
3736
}: {
3837
run: TaskRun;
3938
env: MinimalAuthenticatedEnvironment;
40-
timestamp: number;
4139
tx?: PrismaClientOrTransaction;
4240
snapshot?: {
4341
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
@@ -81,6 +79,8 @@ export class EnqueueSystem {
8179
masterQueues.push(run.secondaryMasterQueue);
8280
}
8381

82+
const timestamp = (run.queueTimestamp ?? run.createdAt).getTime() - run.priorityMs;
83+
8484
await this.$.runQueue.enqueueMessage({
8585
env,
8686
masterQueues,

internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,6 @@ export class PendingVersionSystem {
9797
await this.enqueueSystem.enqueueRun({
9898
run: updatedRun,
9999
env: backgroundWorker.runtimeEnvironment,
100-
//add to the queue using the original run created time
101-
//this should ensure they're in the correct order in the queue
102-
timestamp: updatedRun.createdAt.getTime() - updatedRun.priorityMs,
103100
tx,
104101
});
105102
});

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,6 @@ export class WaitpointSystem {
540540
await this.enqueueSystem.enqueueRun({
541541
run,
542542
env: run.runtimeEnvironment,
543-
timestamp: run.createdAt.getTime() - run.priorityMs,
544543
snapshot: {
545544
status: "QUEUED_EXECUTING",
546545
description: "Run can continue, but is waiting for concurrency",
@@ -564,7 +563,6 @@ export class WaitpointSystem {
564563
await this.enqueueSystem.enqueueRun({
565564
run,
566565
env: run.runtimeEnvironment,
567-
timestamp: run.createdAt.getTime() - run.priorityMs,
568566
snapshot: {
569567
description: "Run was QUEUED, because all waitpoints are completed",
570568
},

0 commit comments

Comments
 (0)