Skip to content

Commit 0bf500f

Browse files
authored
Prioritize finishing waited runs (#1375)
* If a tree node is missing, estimate the size as zero * Task to test prioritizing finishing existing runs after triggerAndWaits * When requeuing a run with a checkpoint, put it in the queue with the parent run time so it’s correctly prioritized * The same change but if there’s no checkpoint
1 parent ceabfba commit 0bf500f

File tree

3 files changed

+47
-13
lines changed

3 files changed

+47
-13
lines changed

apps/webapp/app/components/primitives/TreeView/TreeView.tsx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,11 @@ export function useTree<TData, TFilterValue>({
235235
getItemKey: (index) => state.visibleNodeIds[index],
236236
getScrollElement: () => parentRef.current,
237237
estimateSize: (index: number) => {
238+
const treeItem = tree[index];
239+
if (!treeItem) return 0;
238240
return estimatedRowHeight({
239-
node: tree[index],
240-
state: state.nodes[tree[index].id],
241+
node: treeItem,
242+
state: state.nodes[treeItem.id],
241243
index,
242244
});
243245
},

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ export class ResumeTaskDependencyService extends BaseService {
6363
environmentId: dependency.taskRun.runtimeEnvironment.id,
6464
environmentType: dependency.taskRun.runtimeEnvironment.type,
6565
},
66-
dependentRun.concurrencyKey ?? undefined
66+
dependentRun.concurrencyKey ?? undefined,
67+
dependentRun.createdAt.getTime()
6768
);
6869
} else {
6970
logger.debug("Task dependency resume: Attempt is not paused or there's no checkpoint event", {
@@ -84,16 +85,20 @@ export class ResumeTaskDependencyService extends BaseService {
8485
return;
8586
}
8687

87-
await marqs?.replaceMessage(dependentRun.id, {
88-
type: "RESUME",
89-
completedAttemptIds: [sourceTaskAttemptId],
90-
resumableAttemptId: dependency.dependentAttempt.id,
91-
checkpointEventId: dependency.checkpointEventId ?? undefined,
92-
taskIdentifier: dependency.taskRun.taskIdentifier,
93-
projectId: dependency.taskRun.runtimeEnvironment.projectId,
94-
environmentId: dependency.taskRun.runtimeEnvironment.id,
95-
environmentType: dependency.taskRun.runtimeEnvironment.type,
96-
});
88+
await marqs?.replaceMessage(
89+
dependentRun.id,
90+
{
91+
type: "RESUME",
92+
completedAttemptIds: [sourceTaskAttemptId],
93+
resumableAttemptId: dependency.dependentAttempt.id,
94+
checkpointEventId: dependency.checkpointEventId ?? undefined,
95+
taskIdentifier: dependency.taskRun.taskIdentifier,
96+
projectId: dependency.taskRun.runtimeEnvironment.projectId,
97+
environmentId: dependency.taskRun.runtimeEnvironment.id,
98+
environmentType: dependency.taskRun.runtimeEnvironment.type,
99+
},
100+
dependentRun.createdAt.getTime()
101+
);
97102
}
98103
}
99104

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { logger, task, wait } from "@trigger.dev/sdk/v3";
2+
3+
export const prioritizeContinuing = task({
4+
id: "prioritize-continuing",
5+
run: async ({ count }: { count: number }) => {
6+
await prioritizeContinuingChild.batchTrigger(
7+
Array.from({ length: count }, (_, i) => ({ payload: {} as any }))
8+
);
9+
},
10+
});
11+
12+
export const prioritizeContinuingChild = task({
13+
id: "prioritize-continuing-child",
14+
queue: {
15+
concurrencyLimit: 1,
16+
},
17+
run: async () => {
18+
await fixedLengthTask.triggerAndWait({ waitSeconds: 1 });
19+
},
20+
});
21+
22+
export const fixedLengthTask = task({
23+
id: "fixedLengthTask",
24+
run: async ({ waitSeconds }: { waitSeconds: number }) => {
25+
await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000));
26+
},
27+
});

0 commit comments

Comments
 (0)