Skip to content

Commit 6e58a21

Browse files
committed
Handle errors thrown by requests in Realtime react hooks
1 parent 51a054f commit 6e58a21

File tree

8 files changed

+80
-9
lines changed

8 files changed

+80
-9
lines changed

.changeset/many-panthers-relax.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
---
4+
5+
Make sure useRealtimeRun onComplete hook fires at the correct time

.changeset/silent-dragons-chew.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Handle errors thrown by requests in Realtime react hooks

packages/core/src/v3/apiClient/index.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -640,20 +640,25 @@ export class ApiClient {
640640

641641
subscribeToRun<TRunTypes extends AnyRunTypes>(
642642
runId: string,
643-
options?: { signal?: AbortSignal; closeOnComplete?: boolean }
643+
options?: {
644+
signal?: AbortSignal;
645+
closeOnComplete?: boolean;
646+
onFetchError?: (error: Error) => void;
647+
}
644648
) {
645649
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
646650
closeOnComplete:
647651
typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true,
648652
headers: this.#getRealtimeHeaders(),
649653
client: this,
650654
signal: options?.signal,
655+
onFetchError: options?.onFetchError,
651656
});
652657
}
653658

654659
subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(
655660
tag: string | string[],
656-
options?: { signal?: AbortSignal }
661+
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
657662
) {
658663
const searchParams = createSearchQueryForSubscribeToRuns({
659664
tags: tag,
@@ -666,19 +671,21 @@ export class ApiClient {
666671
headers: this.#getRealtimeHeaders(),
667672
client: this,
668673
signal: options?.signal,
674+
onFetchError: options?.onFetchError,
669675
}
670676
);
671677
}
672678

673679
subscribeToBatch<TRunTypes extends AnyRunTypes>(
674680
batchId: string,
675-
options?: { signal?: AbortSignal }
681+
options?: { signal?: AbortSignal; onFetchError?: (error: Error) => void }
676682
) {
677683
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
678684
closeOnComplete: false,
679685
headers: this.#getRealtimeHeaders(),
680686
client: this,
681687
signal: options?.signal,
688+
onFetchError: options?.onFetchError,
682689
});
683690
}
684691

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export type RunShapeStreamOptions = {
6666
closeOnComplete?: boolean;
6767
signal?: AbortSignal;
6868
client?: ApiClient;
69+
onFetchError?: (e: Error) => void;
6970
};
7071

7172
export type StreamPartResult<TRun, TStreams extends Record<string, any>> = {
@@ -111,6 +112,9 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(
111112
const runStreamInstance = zodShapeStream(SubscribeRunRawShape, url, {
112113
...options,
113114
signal: abortController.signal,
115+
onError: (e) => {
116+
options?.onFetchError?.(e);
117+
},
114118
});
115119

116120
const $options: RunSubscriptionOptions = {

packages/core/src/v3/apiClient/stream.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export type ZodShapeStreamOptions = {
1414
headers?: Record<string, string>;
1515
fetchClient?: typeof fetch;
1616
signal?: AbortSignal;
17+
onError?: (e: Error) => void;
1718
};
1819

1920
export type ZodShapeStreamInstance<TShapeSchema extends z.ZodTypeAny> = {
@@ -44,6 +45,9 @@ export function zodShapeStream<TShapeSchema extends z.ZodTypeAny>(
4445
},
4546
fetchClient: options?.fetchClient,
4647
signal: abortController.signal,
48+
onError: (e) => {
49+
options?.onError?.(e);
50+
},
4751
});
4852

4953
const readableShape = new ReadableShapeStream(shapeStream);

packages/react-hooks/src/hooks/useRealtime.ts

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export function useRealtimeRun<TTask extends AnyTask>(
103103
runId,
104104
apiClient,
105105
mutateRun,
106+
setError,
106107
abortControllerRef,
107108
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true
108109
);
@@ -150,6 +151,12 @@ export function useRealtimeRun<TTask extends AnyTask>(
150151
};
151152
}, [runId, stop, options?.enabled]);
152153

154+
useEffect(() => {
155+
if (run?.finishedAt) {
156+
setIsComplete(true);
157+
}
158+
}, [run]);
159+
153160
return { run, error, stop };
154161
}
155162

@@ -258,6 +265,7 @@ export function useRealtimeRunWithStreams<
258265
mutateRun,
259266
mutateStreams,
260267
streamsRef,
268+
setError,
261269
abortControllerRef,
262270
typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true,
263271
options?.experimental_throttleInMs
@@ -306,6 +314,12 @@ export function useRealtimeRunWithStreams<
306314
};
307315
}, [runId, stop, options?.enabled]);
308316

317+
useEffect(() => {
318+
if (run?.finishedAt) {
319+
setIsComplete(true);
320+
}
321+
}, [run]);
322+
309323
return { run, streams: streams ?? initialStreamsFallback, error, stop };
310324
}
311325

@@ -380,7 +394,14 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
380394
const abortController = new AbortController();
381395
abortControllerRef.current = abortController;
382396

383-
await processRealtimeRunsWithTag(tag, apiClient, mutateRuns, runsRef, abortControllerRef);
397+
await processRealtimeRunsWithTag(
398+
tag,
399+
apiClient,
400+
mutateRuns,
401+
runsRef,
402+
setError,
403+
abortControllerRef
404+
);
384405
} catch (err) {
385406
// Ignore abort errors as they are expected.
386407
if ((err as any).name === "AbortError") {
@@ -470,7 +491,14 @@ export function useRealtimeBatch<TTask extends AnyTask>(
470491
const abortController = new AbortController();
471492
abortControllerRef.current = abortController;
472493

473-
await processRealtimeBatch(batchId, apiClient, mutateRuns, runsRef, abortControllerRef);
494+
await processRealtimeBatch(
495+
batchId,
496+
apiClient,
497+
mutateRuns,
498+
runsRef,
499+
setError,
500+
abortControllerRef
501+
);
474502
} catch (err) {
475503
// Ignore abort errors as they are expected.
476504
if ((err as any).name === "AbortError") {
@@ -506,10 +534,12 @@ async function processRealtimeBatch<TTask extends AnyTask = AnyTask>(
506534
apiClient: ApiClient,
507535
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
508536
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
537+
onError: (e: Error) => void,
509538
abortControllerRef: React.MutableRefObject<AbortController | null>
510539
) {
511540
const subscription = apiClient.subscribeToBatch<InferRunTypes<TTask>>(batchId, {
512541
signal: abortControllerRef.current?.signal,
542+
onFetchError: onError,
513543
});
514544

515545
for await (const part of subscription) {
@@ -541,10 +571,12 @@ async function processRealtimeRunsWithTag<TTask extends AnyTask = AnyTask>(
541571
apiClient: ApiClient,
542572
mutateRunsData: KeyedMutator<RealtimeRun<TTask>[]>,
543573
existingRunsRef: React.MutableRefObject<RealtimeRun<TTask>[]>,
574+
onError: (e: Error) => void,
544575
abortControllerRef: React.MutableRefObject<AbortController | null>
545576
) {
546577
const subscription = apiClient.subscribeToRunsWithTag<InferRunTypes<TTask>>(tag, {
547578
signal: abortControllerRef.current?.signal,
579+
onFetchError: onError,
548580
});
549581

550582
for await (const part of subscription) {
@@ -582,13 +614,15 @@ async function processRealtimeRunWithStreams<
582614
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
583615
mutateStreamData: KeyedMutator<StreamResults<TStreams>>,
584616
existingDataRef: React.MutableRefObject<StreamResults<TStreams>>,
617+
onError: (e: Error) => void,
585618
abortControllerRef: React.MutableRefObject<AbortController | null>,
586619
stopOnCompletion: boolean = true,
587620
throttleInMs?: number
588621
) {
589622
const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, {
590623
signal: abortControllerRef.current?.signal,
591624
closeOnComplete: stopOnCompletion,
625+
onFetchError: onError,
592626
});
593627

594628
type StreamUpdate = {
@@ -637,12 +671,14 @@ async function processRealtimeRun<TTask extends AnyTask = AnyTask>(
637671
runId: string,
638672
apiClient: ApiClient,
639673
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
674+
onError: (e: Error) => void,
640675
abortControllerRef: React.MutableRefObject<AbortController | null>,
641676
stopOnCompletion: boolean = true
642677
) {
643678
const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, {
644679
signal: abortControllerRef.current?.signal,
645680
closeOnComplete: stopOnCompletion,
681+
onFetchError: onError,
646682
});
647683

648684
for await (const part of subscription) {

references/nextjs-realtime/src/app/actions.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"use server";
22

33
import type { exampleTask } from "@/trigger/example";
4-
import { tasks } from "@trigger.dev/sdk/v3";
4+
import { auth, tasks } from "@trigger.dev/sdk/v3";
55
import { cookies } from "next/headers";
66
import { redirect } from "next/navigation";
77
import { randomUUID } from "node:crypto";
@@ -11,10 +11,19 @@ export async function triggerExampleTask() {
1111
id: randomUUID(),
1212
});
1313

14-
console.log("Setting the run JWT in a cookie", handle.publicAccessToken);
14+
const publicToken = await auth.createPublicToken({
15+
scopes: {
16+
read: {
17+
runs: [handle.id],
18+
},
19+
},
20+
expirationTime: "2s",
21+
});
22+
23+
console.log("Setting the run JWT in a cookie", publicToken);
1524

1625
// Set JWT in a secure, HTTP-only cookie
17-
cookies().set("run_token", handle.publicAccessToken);
26+
cookies().set("run_token", publicToken);
1827

1928
// Redirect to the details page
2029
redirect(`/runs/${handle.id}`);

references/nextjs-realtime/src/app/runs/[id]/ClientRunDetails.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function RunDetailsWrapper({
3232
},
3333
});
3434

35-
if (error) {
35+
if (error && !run) {
3636
return (
3737
<div className="w-full min-h-screen bg-gray-900 p-4">
3838
<Card className="w-full bg-gray-800 shadow-md">

0 commit comments

Comments
 (0)