Skip to content

Commit 623f158

Browse files
committed
try to correct resume messages with missing checkpoint
1 parent b6d1e0d commit 623f158

File tree

2 files changed

+101
-9
lines changed

2 files changed

+101
-9
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 81 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4343
import { EnvironmentVariable } from "../environmentVariables/repository";
4444
import { machinePresetFromConfig } from "../machinePresets.server";
4545
import { env } from "~/env.server";
46-
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
46+
import { FINAL_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
4747
import { getMaxDuration } from "../utils/maxDuration";
4848

4949
const WithTraceContext = z.object({
@@ -620,6 +620,9 @@ export class SharedQueueConsumer {
620620
const resumableRun = await prisma.taskRun.findUnique({
621621
where: {
622622
id: message.messageId,
623+
status: {
624+
notIn: FINAL_RUN_STATUSES,
625+
},
623626
},
624627
});
625628

@@ -633,6 +636,14 @@ export class SharedQueueConsumer {
633636
return;
634637
}
635638

639+
if (resumableRun.status !== "EXECUTING") {
640+
logger.warn("Run is not executing, will try to resume anyway", {
641+
queueMessage: message.data,
642+
messageId: message.messageId,
643+
runStatus: resumableRun.status,
644+
});
645+
}
646+
636647
const resumableAttempt = await prisma.taskRunAttempt.findUnique({
637648
where: {
638649
id: messageBody.data.resumableAttemptId,
@@ -740,7 +751,11 @@ export class SharedQueueConsumer {
740751
executions,
741752
};
742753

743-
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });
754+
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", {
755+
resumeMessage,
756+
message,
757+
resumableRun,
758+
});
744759

745760
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
746761
const responses = await socketIo.coordinatorNamespace
@@ -763,15 +778,72 @@ export class SharedQueueConsumer {
763778
}
764779

765780
const hasSuccess = responses.some((response) => response.success);
766-
if (!hasSuccess) {
767-
logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
768-
resumeMessage,
769-
responses,
770-
message,
771-
});
772-
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
781+
782+
if (hasSuccess) {
783+
this.#doMoreWork();
773784
return;
774785
}
786+
787+
// No coordinator was able to resume the run
788+
logger.warn("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
789+
resumeMessage,
790+
responses,
791+
message,
792+
});
793+
794+
// Let's check if the run is frozen
795+
if (resumableRun.status === "WAITING_TO_RESUME") {
796+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK run is waiting to be restored", {
797+
queueMessage: message.data,
798+
messageId: message.messageId,
799+
});
800+
801+
try {
802+
const restoreService = new RestoreCheckpointService();
803+
804+
const checkpointEvent = await restoreService.getLastCheckpointEventIfUnrestored(
805+
resumableRun.id
806+
);
807+
808+
if (checkpointEvent) {
809+
// The last checkpoint hasn't been restored yet, so restore it
810+
const checkpoint = await restoreService.call({
811+
eventId: checkpointEvent.id,
812+
});
813+
814+
if (!checkpoint) {
815+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK failed to restore checkpoint", {
816+
queueMessage: message.data,
817+
messageId: message.messageId,
818+
});
819+
820+
await this.#ackAndDoMoreWork(message.messageId);
821+
return;
822+
}
823+
824+
this.#doMoreWork();
825+
return;
826+
}
827+
} catch (e) {
828+
if (e instanceof Error) {
829+
this._currentSpan?.recordException(e);
830+
} else {
831+
this._currentSpan?.recordException(new Error(String(e)));
832+
}
833+
834+
this._endSpanInNextIteration = true;
835+
836+
await this.#nackAndDoMoreWork(
837+
message.messageId,
838+
this._options.nextTickInterval,
839+
5_000
840+
);
841+
return;
842+
}
843+
}
844+
845+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
846+
return;
775847
} catch (e) {
776848
if (e instanceof Error) {
777849
this._currentSpan?.recordException(e);

apps/webapp/app/v3/services/restoreCheckpoint.server.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,24 @@ export class RestoreCheckpointService extends BaseService {
112112

113113
return checkpoint;
114114
}
115+
116+
async getLastCheckpointEventIfUnrestored(runId: string) {
117+
const event = await this._prisma.checkpointRestoreEvent.findFirst({
118+
where: {
119+
runId,
120+
},
121+
take: 1,
122+
orderBy: {
123+
createdAt: "desc",
124+
},
125+
});
126+
127+
if (!event) {
128+
return;
129+
}
130+
131+
if (event.type === "CHECKPOINT") {
132+
return event;
133+
}
134+
}
115135
}

0 commit comments

Comments
 (0)