Skip to content

Commit 1ee3860

Browse files
committed
Fix several restore and resume bugs (#1418)
* try to correct resume messages with missing checkpoint * prevent creating checkpoints for outdated task waits * prevent creating checkpoints for outdated batch waits * use heartbeats to check for and clean up any leftover containers * lint * improve exec logging * improve resume attempt logs * fix for resuming parents of canceled child runs * separate SIGTERM from maybe OOM errors * pretty errors can have magic dashboard links * prevent uncancellable checkpoints * simplify task run error code enum export * grab the last, not the first child run * Revert "prevent creating checkpoints for outdated batch waits" This reverts commit f2b5c2a. * Revert "grab the last, not the first child run" This reverts commit 89ec5c8. * Revert "prevent creating checkpoints for outdated task waits" This reverts commit 11066b4. * more logs for resume message handling * add magic error link comment * add changeset
1 parent 45a34f9 commit 1ee3860

File tree

11 files changed

+369
-64
lines changed

11 files changed

+369
-64
lines changed

.changeset/many-plants-destroy.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
SIGTERM detection and prettier errors

apps/coordinator/src/checkpointer.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,10 @@ export class Checkpointer {
436436
this.#logger.error("Error during cleanup", { ...metadata, error });
437437
}
438438

439-
this.#abortControllers.delete(runId);
439+
// Ensure only the current controller is removed
440+
if (this.#abortControllers.get(runId) === controller) {
441+
this.#abortControllers.delete(runId);
442+
}
440443
controller.signal.removeEventListener("abort", onAbort);
441444
};
442445

apps/coordinator/src/exec.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,18 @@ export class Exec {
6464
command,
6565
argsRaw: args,
6666
argsTrimmed,
67-
...output,
67+
globalOpts: {
68+
trimArgs: this.trimArgs,
69+
neverThrow: this.neverThrow,
70+
hasAbortSignal: !!this.abortSignal,
71+
},
72+
localOpts: opts,
73+
stdout: output.stdout,
74+
stderr: output.stderr,
75+
pid: result.pid,
76+
exitCode: result.exitCode,
77+
aborted: result.aborted,
78+
killed: result.killed,
6879
};
6980

7081
if (this.logOutput) {

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { CheckIcon, ClockIcon, CloudArrowDownIcon, QueueListIcon } from "@heroicons/react/20/solid";
1+
import {
2+
CheckIcon,
3+
ClockIcon,
4+
CloudArrowDownIcon,
5+
EnvelopeIcon,
6+
QueueListIcon,
7+
} from "@heroicons/react/20/solid";
28
import { Link } from "@remix-run/react";
39
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
410
import {
@@ -13,6 +19,7 @@ import { typedjson, useTypedFetcher } from "remix-typedjson";
1319
import { ExitIcon } from "~/assets/icons/ExitIcon";
1420
import { CodeBlock } from "~/components/code/CodeBlock";
1521
import { EnvironmentLabel } from "~/components/environments/EnvironmentLabel";
22+
import { Feedback } from "~/components/Feedback";
1623
import { Button, LinkButton } from "~/components/primitives/Buttons";
1724
import { Callout } from "~/components/primitives/Callout";
1825
import { DateTime, DateTimeAccurate } from "~/components/primitives/DateTime";
@@ -963,11 +970,26 @@ function RunError({ error }: { error: TaskRunError }) {
963970
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
964971
<Header3 className="text-rose-500">{name}</Header3>
965972
{enhancedError.message && <Callout variant="error">{enhancedError.message}</Callout>}
966-
{enhancedError.link && (
967-
<Callout variant="docs" to={enhancedError.link.href}>
968-
{enhancedError.link.name}
969-
</Callout>
970-
)}
973+
{enhancedError.link &&
974+
(enhancedError.link.magic === "CONTACT_FORM" ? (
975+
<Feedback
976+
button={
977+
<Button
978+
variant="tertiary/medium"
979+
LeadingIcon={EnvelopeIcon}
980+
leadingIconClassName="text-blue-400"
981+
fullWidth
982+
textAlignLeft
983+
>
984+
{enhancedError.link.name}
985+
</Button>
986+
}
987+
/>
988+
) : (
989+
<Callout variant="docs" to={enhancedError.link.href}>
990+
{enhancedError.link.name}
991+
</Callout>
992+
))}
971993
{enhancedError.stackTrace && (
972994
<CodeBlock
973995
showCopyButton={false}

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export class EnvironmentVariablesRepository implements Repository {
136136

137137
try {
138138
for (const variable of values) {
139-
const result = await $transaction(this.prismaClient, async (tx) => {
139+
const result = await $transaction(this.prismaClient, async (tx) => {
140140
const environmentVariable = await tx.environmentVariable.upsert({
141141
where: {
142142
projectId_key: {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 169 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
TaskRunStatus,
2222
} from "@trigger.dev/database";
2323
import { z } from "zod";
24-
import { prisma } from "~/db.server";
24+
import { $replica, prisma } from "~/db.server";
2525
import { findEnvironmentById } from "~/models/runtimeEnvironment.server";
2626
import { logger } from "~/services/logger.server";
2727
import { singleton } from "~/utils/singleton";
@@ -43,7 +43,12 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4343
import { EnvironmentVariable } from "../environmentVariables/repository";
4444
import { machinePresetFromConfig } from "../machinePresets.server";
4545
import { env } from "~/env.server";
46-
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
46+
import {
47+
FINAL_ATTEMPT_STATUSES,
48+
FINAL_RUN_STATUSES,
49+
isFinalAttemptStatus,
50+
isFinalRunStatus,
51+
} from "../taskStatus";
4752
import { getMaxDuration } from "../utils/maxDuration";
4853

4954
const WithTraceContext = z.object({
@@ -620,6 +625,9 @@ export class SharedQueueConsumer {
620625
const resumableRun = await prisma.taskRun.findUnique({
621626
where: {
622627
id: message.messageId,
628+
status: {
629+
notIn: FINAL_RUN_STATUSES,
630+
},
623631
},
624632
});
625633

@@ -633,6 +641,14 @@ export class SharedQueueConsumer {
633641
return;
634642
}
635643

644+
if (resumableRun.status !== "EXECUTING") {
645+
logger.warn("Run is not executing, will try to resume anyway", {
646+
queueMessage: message.data,
647+
messageId: message.messageId,
648+
runStatus: resumableRun.status,
649+
});
650+
}
651+
636652
const resumableAttempt = await prisma.taskRunAttempt.findUnique({
637653
where: {
638654
id: messageBody.data.resumableAttemptId,
@@ -740,7 +756,11 @@ export class SharedQueueConsumer {
740756
executions,
741757
};
742758

743-
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });
759+
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", {
760+
resumeMessage,
761+
message,
762+
resumableRun,
763+
});
744764

745765
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
746766
const responses = await socketIo.coordinatorNamespace
@@ -763,15 +783,91 @@ export class SharedQueueConsumer {
763783
}
764784

765785
const hasSuccess = responses.some((response) => response.success);
766-
if (!hasSuccess) {
767-
logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
768-
resumeMessage,
769-
responses,
770-
message,
771-
});
772-
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
786+
787+
if (hasSuccess) {
788+
this.#doMoreWork();
773789
return;
774790
}
791+
792+
// No coordinator was able to resume the run
793+
logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
794+
resumeMessage,
795+
responses,
796+
message,
797+
});
798+
799+
// Let's check if the run is frozen
800+
if (resumableRun.status === "WAITING_TO_RESUME") {
801+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK run is waiting to be restored", {
802+
queueMessage: message.data,
803+
messageId: message.messageId,
804+
});
805+
806+
try {
807+
const restoreService = new RestoreCheckpointService();
808+
809+
const checkpointEvent = await restoreService.getLastCheckpointEventIfUnrestored(
810+
resumableRun.id
811+
);
812+
813+
if (checkpointEvent) {
814+
// The last checkpoint hasn't been restored yet, so restore it
815+
const checkpoint = await restoreService.call({
816+
eventId: checkpointEvent.id,
817+
});
818+
819+
if (!checkpoint) {
820+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK failed to restore checkpoint", {
821+
queueMessage: message.data,
822+
messageId: message.messageId,
823+
});
824+
825+
await this.#ackAndDoMoreWork(message.messageId);
826+
return;
827+
}
828+
829+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK restored checkpoint", {
830+
queueMessage: message.data,
831+
messageId: message.messageId,
832+
checkpoint,
833+
});
834+
835+
this.#doMoreWork();
836+
return;
837+
} else {
838+
logger.debug(
839+
"RESUME_AFTER_DEPENDENCY_WITH_ACK run is frozen without last checkpoint event",
840+
{
841+
queueMessage: message.data,
842+
messageId: message.messageId,
843+
}
844+
);
845+
}
846+
} catch (e) {
847+
if (e instanceof Error) {
848+
this._currentSpan?.recordException(e);
849+
} else {
850+
this._currentSpan?.recordException(new Error(String(e)));
851+
}
852+
853+
this._endSpanInNextIteration = true;
854+
855+
await this.#nackAndDoMoreWork(
856+
message.messageId,
857+
this._options.nextTickInterval,
858+
5_000
859+
);
860+
return;
861+
}
862+
}
863+
864+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK retrying", {
865+
queueMessage: message.data,
866+
messageId: message.messageId,
867+
});
868+
869+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
870+
return;
775871
} catch (e) {
776872
if (e instanceof Error) {
777873
this._currentSpan?.recordException(e);
@@ -896,7 +992,7 @@ class SharedQueueTasks {
896992
where: {
897993
id,
898994
status: {
899-
in: ["COMPLETED", "FAILED"],
995+
in: FINAL_ATTEMPT_STATUSES,
900996
},
901997
},
902998
include: {
@@ -1237,13 +1333,13 @@ class SharedQueueTasks {
12371333
return;
12381334
}
12391335

1240-
await marqs?.heartbeatMessage(taskRunAttempt.taskRunId);
1336+
await this.#heartbeat(taskRunAttempt.taskRunId);
12411337
}
12421338

12431339
async taskRunHeartbeat(runId: string) {
12441340
logger.debug("[SharedQueueConsumer] taskRunHeartbeat()", { runId });
12451341

1246-
await marqs?.heartbeatMessage(runId);
1342+
await this.#heartbeat(runId);
12471343
}
12481344

12491345
public async taskRunFailed(completion: TaskRunFailedExecutionResult) {
@@ -1254,6 +1350,66 @@ class SharedQueueTasks {
12541350
await service.call(completion.id, completion);
12551351
}
12561352

1353+
async #heartbeat(runId: string) {
1354+
await marqs?.heartbeatMessage(runId);
1355+
1356+
try {
1357+
// There can be a lot of calls per minute and the data doesn't have to be accurate, so use the read replica
1358+
const taskRun = await $replica.taskRun.findFirst({
1359+
where: {
1360+
id: runId,
1361+
},
1362+
select: {
1363+
id: true,
1364+
status: true,
1365+
runtimeEnvironment: {
1366+
select: {
1367+
type: true,
1368+
},
1369+
},
1370+
lockedToVersion: {
1371+
select: {
1372+
supportsLazyAttempts: true,
1373+
},
1374+
},
1375+
},
1376+
});
1377+
1378+
if (!taskRun) {
1379+
logger.error("SharedQueueTasks.#heartbeat: Task run not found", {
1380+
runId,
1381+
});
1382+
1383+
return;
1384+
}
1385+
1386+
if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") {
1387+
return;
1388+
}
1389+
1390+
if (isFinalRunStatus(taskRun.status)) {
1391+
logger.debug("SharedQueueTasks.#heartbeat: Task run is in final status", {
1392+
runId,
1393+
status: taskRun.status,
1394+
});
1395+
1396+
// Signal to exit any leftover containers
1397+
socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", {
1398+
version: "v1",
1399+
runId: taskRun.id,
1400+
// Give the run a few seconds to exit to complete any flushing etc
1401+
delayInMs: taskRun.lockedToVersion?.supportsLazyAttempts ? 5_000 : undefined,
1402+
});
1403+
return;
1404+
}
1405+
} catch (error) {
1406+
logger.error("SharedQueueTasks.#heartbeat: Error signaling run cancellation", {
1407+
runId,
1408+
error: error instanceof Error ? error.message : error,
1409+
});
1410+
}
1411+
}
1412+
12571413
async #buildEnvironmentVariables(
12581414
environment: RuntimeEnvironment,
12591415
runId: string,

0 commit comments

Comments
 (0)