Skip to content

Commit a361492

Browse files
committed
Fix cancelled runs breaking realtime subscriptions
1 parent 9b35cc4 commit a361492

File tree

5 files changed

+20
-18
lines changed

5 files changed

+20
-18
lines changed

.changeset/shaggy-donkeys-hammer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Fix an issue that caused errors when using realtime with a run that is cancelled

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,13 @@ export class FinalizeTaskRunService extends BaseService {
6464
completedAt,
6565
});
6666

67+
// I moved the error update here for two reasons:
68+
// - A single update is more efficient than two
69+
// - If the status updates to a final status, realtime will receive that status and then shut down the stream
70+
// before the error is updated, which would cause the error to be lost
6771
const run = await this._prisma.taskRun.update({
6872
where: { id },
69-
data: { status, expiredAt, completedAt },
73+
data: { status, expiredAt, completedAt, error: error ? sanitizeError(error) : undefined },
7074
...(include ? { include } : {}),
7175
});
7276

@@ -78,10 +82,6 @@ export class FinalizeTaskRunService extends BaseService {
7882
await this.finalizeAttempt({ attemptStatus, error, run });
7983
}
8084

81-
if (error) {
82-
await this.finalizeRunError(run, error);
83-
}
84-
8585
try {
8686
await this.#finalizeBatch(run);
8787
} catch (finalizeBatchError) {
@@ -211,15 +211,6 @@ export class FinalizeTaskRunService extends BaseService {
211211
}
212212
}
213213

214-
async finalizeRunError(run: TaskRun, error: TaskRunError) {
215-
await this._prisma.taskRun.update({
216-
where: { id: run.id },
217-
data: {
218-
error: sanitizeError(error),
219-
},
220-
});
221-
}
222-
223214
async finalizeAttempt({
224215
attemptStatus,
225216
error,

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2+
import { createJsonErrorObject } from "../errors.js";
23
import { RunStatus, SubscribeRunRawShape } from "../schemas/api.js";
34
import { SerializedError } from "../schemas/common.js";
45
import { AnyRunTypes, AnyTask, InferRunTypes } from "../types/tasks.js";
@@ -347,7 +348,7 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
347348
startedAt: row.startedAt ?? undefined,
348349
delayedUntil: row.delayUntil ?? undefined,
349350
queuedAt: row.queuedAt ?? undefined,
350-
error: row.error ?? undefined,
351+
error: row.error ? createJsonErrorObject(row.error) : undefined,
351352
isTest: row.isTest,
352353
metadata,
353354
} as RunShape<TRunTypes>;

packages/core/src/v3/schemas/api.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z } from "zod";
22
import { DeserializedJsonSchema } from "../../schemas/json.js";
3-
import { SerializedError } from "./common.js";
3+
import { SerializedError, TaskRunError } from "./common.js";
44
import { BackgroundWorkerMetadata } from "./resources.js";
55
import { QueueOptions } from "./schemas.js";
66

@@ -708,7 +708,7 @@ export const SubscribeRunRawShape = z.object({
708708
output: z.string().nullish(),
709709
outputType: z.string().nullish(),
710710
runTags: z.array(z.string()).nullish().default([]),
711-
error: SerializedError.nullish(),
711+
error: TaskRunError.nullish(),
712712
});
713713

714714
export type SubscribeRunRawShape = z.infer<typeof SubscribeRunRawShape>;

references/nextjs-realtime/src/trigger/example.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ export const exampleTask = schemaTask({
1515

1616
metadata.set("status", { type: "started", progress: 0.1 });
1717

18-
await setTimeout(2000);
18+
if (Math.random() < 0.9) {
19+
// Simulate a failure
20+
throw new Error("Random failure");
21+
}
22+
23+
await setTimeout(20000);
1924

2025
metadata.set("status", { type: "processing", progress: 0.5 });
2126

0 commit comments

Comments
 (0)