Skip to content

Add acking to RESUME_AFTER_DEPENDENCY message to the coordinator #1313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Sep 18, 2024
Merged
69 changes: 69 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,49 @@ class TaskCoordinator {

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

if (!taskSocket) {
logger.log("Socket for attempt not found", {
attemptFriendlyId: message.attemptFriendlyId,
});
return {
success: false,
error: {
name: "SocketNotFoundError",
message: "Socket for attempt not found",
},
};
}

//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
socketData: taskSocket.data,
});

return {
success: false,
error: {
name: "CheckpointMessagePresentError",
message:
"Checkpoint message is present, so we need to kill the process and resume from the queue.",
},
};
}

await chaosMonkey.call();

// In case the task resumed faster than we could checkpoint
this.#cancelCheckpoint(message.runId);

taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);

return {
success: true,
};
},
RESUME_AFTER_DURATION: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);

Expand Down Expand Up @@ -792,6 +835,18 @@ class TaskCoordinator {
return;
}

logger.log("WAIT_FOR_TASK checkpoint created", {
checkpoint,
socketData: socket.data,
});

//setting this means we can only resume from a checkpoint
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", {
checkpoint,
socketData: socket.data,
});

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
Expand All @@ -804,6 +859,7 @@ class TaskCoordinator {
});

if (ack?.keepRunAlive) {
socket.data.requiresCheckpointResumeWithMessage = undefined;
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
return;
}
Expand Down Expand Up @@ -862,6 +918,18 @@ class TaskCoordinator {
return;
}

logger.log("WAIT_FOR_BATCH checkpoint created", {
checkpoint,
socketData: socket.data,
});

//setting this means we can only resume from a checkpoint
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
logger.log("WAIT_FOR_BATCH set checkpoint", {
checkpoint,
socketData: socket.data,
});

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
Expand All @@ -875,6 +943,7 @@ class TaskCoordinator {
});

if (ack?.keepRunAlive) {
socket.data.requiresCheckpointResumeWithMessage = undefined;
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
return;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/hooks/useSearchParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ export function useSearchParams() {
}

if (typeof value === "string") {
search.set(param, encodeURIComponent(value));
search.set(param, value);
continue;
}

search.delete(param);
for (const v of value) {
search.append(param, encodeURIComponent(v));
search.append(param, v);
}
}
},
Expand Down
43 changes: 35 additions & 8 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -725,20 +725,47 @@ export class SharedQueueConsumer {
}

try {
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
});

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
const resumeMessage = {
version: "v1" as const,
runId: resumableAttempt.taskRunId,
attemptId: resumableAttempt.id,
attemptFriendlyId: resumableAttempt.friendlyId,
completions,
executions,
};

logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });

// The attempt should still be running so we can broadcast to all coordinators to resume immediately
const responses = await socketIo.coordinatorNamespace
.timeout(10_000)
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);

logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
resumeMessage,
responses,
message,
});

if (responses.length === 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
resumeMessage,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}

const failed = responses.filter((response) => !response.success);
if (failed.length > 0) {
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
resumeMessage,
failed,
message,
});
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
return;
}
} catch (e) {
if (e instanceof Error) {
this._currentSpan?.recordException(e);
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/resumeBatchRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService {
if (wasUpdated) {
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
batchRunId: batchRun.id,
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
dependentTaskAttempt: batchRun.dependentTaskAttempt,
checkpointEventId: batchRun.checkpointEventId,
hasCheckpointEvent: !!batchRun.checkpointEventId,
});
await marqs?.replaceMessage(dependentRun.id, {
type: "RESUME",
Expand Down
43 changes: 29 additions & 14 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@ import {
WaitReason,
} from "./schemas.js";

const ackCallbackResult = z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]);

export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
z.object({
type: z.literal("CANCEL_ATTEMPT"),
Expand Down Expand Up @@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
projectId: z.string(),
deploymentId: z.string(),
}),
callback: z.discriminatedUnion("success", [
z.object({
success: z.literal(false),
error: z.object({
name: z.string(),
message: z.string(),
stack: z.string().optional(),
stderr: z.string().optional(),
}),
}),
z.object({
success: z.literal(true),
}),
]),
callback: ackCallbackResult,
},
RESTORE: {
message: z.object({
Expand Down Expand Up @@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
};

export const PlatformToCoordinatorMessages = {
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
RESUME_AFTER_DEPENDENCY: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand All @@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
executions: TaskRunExecution.array(),
}),
},
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
message: z.object({
version: z.literal("v1").default("v1"),
runId: z.string(),
attemptId: z.string(),
attemptFriendlyId: z.string(),
completions: TaskRunExecutionResult.array(),
executions: TaskRunExecution.array(),
}),
callback: ackCallbackResult,
},
RESUME_AFTER_DURATION: {
message: z.object({
version: z.literal("v1").default("v1"),
Expand Down Expand Up @@ -847,6 +861,7 @@ export const ProdWorkerSocketData = z.object({
podName: z.string(),
deploymentId: z.string(),
deploymentVersion: z.string(),
requiresCheckpointResumeWithMessage: z.string().optional(),
});

export const CoordinatorSocketData = z.object({
Expand Down
Loading