Skip to content

Commit 6b24e01

Browse files
committed
Add streaming support for the realtime react hooks
1 parent 71bc83b commit 6b24e01

File tree

23 files changed

+705
-203
lines changed

23 files changed

+705
-203
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,13 @@ async function responseHeaders(
111111
triggerClient?: string | null
112112
): Promise<Record<string, string>> {
113113
const claimsHeader = JSON.stringify({
114-
sub: run.runtimeEnvironmentId,
114+
sub: environment.id,
115115
pub: true,
116116
});
117117

118118
if (triggerClient === "browser") {
119119
const claims = {
120-
sub: run.runtimeEnvironmentId,
120+
sub: environment.id,
121121
pub: true,
122122
scopes: [`read:runs:${run.friendlyId}`],
123123
};

apps/webapp/app/services/apiAuth.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export async function authenticateApiRequest(
4141
options: { allowPublicKey?: boolean; allowJWT?: boolean } = {}
4242
): Promise<ApiAuthenticationResult | undefined> {
4343
const apiKey = getApiKeyFromRequest(request);
44+
4445
if (!apiKey) {
4546
return;
4647
}

apps/webapp/app/services/routeBuiilders/apiBuilder.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,5 +506,7 @@ export function createActionApiRoute<
506506
}
507507

508508
function wrapResponse(request: Request, response: Response, useCors: boolean) {
509-
return useCors ? apiCors(request, response) : response;
509+
return useCors
510+
? apiCors(request, response, { exposedHeaders: ["x-trigger-jwt", "x-trigger-jwt-claims"] })
511+
: response;
510512
}

apps/webapp/app/utils/apiCors.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ type CorsOptions = {
88
maxAge?: number;
99
origin?: boolean | string;
1010
credentials?: boolean;
11+
exposedHeaders?: string[];
1112
};
1213

1314
export async function apiCors(

packages/core/src/v3/apiClient/index.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import {
4545
RunStreamCallback,
4646
RunSubscription,
4747
TaskRunShape,
48+
RealtimeRun,
4849
} from "./runStream.js";
4950
import {
5051
CreateEnvironmentVariableParams,
@@ -88,7 +89,14 @@ const DEFAULT_ZOD_FETCH_OPTIONS: ZodFetchOptions = {
8889

8990
export { isRequestOptions };
9091
export type { ApiRequestOptions };
91-
export type { RunShape, AnyRunShape, TaskRunShape, RunStreamCallback, RunSubscription };
92+
export type {
93+
RunShape,
94+
AnyRunShape,
95+
TaskRunShape,
96+
RealtimeRun,
97+
RunStreamCallback,
98+
RunSubscription,
99+
};
92100

93101
/**
94102
* Trigger.dev v3 API client
@@ -603,15 +611,19 @@ export class ApiClient {
603611
);
604612
}
605613

606-
subscribeToRun<TRunTypes extends AnyRunTypes>(runId: string) {
614+
subscribeToRun<TRunTypes extends AnyRunTypes>(runId: string, options?: { signal?: AbortSignal }) {
607615
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
608616
closeOnComplete: true,
609617
headers: this.#getRealtimeHeaders(),
610618
client: this,
619+
signal: options?.signal,
611620
});
612621
}
613622

614-
subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(tag: string | string[]) {
623+
subscribeToRunsWithTag<TRunTypes extends AnyRunTypes>(
624+
tag: string | string[],
625+
options?: { signal?: AbortSignal }
626+
) {
615627
const searchParams = createSearchQueryForSubscribeToRuns({
616628
tags: tag,
617629
});
@@ -622,15 +634,20 @@ export class ApiClient {
622634
closeOnComplete: false,
623635
headers: this.#getRealtimeHeaders(),
624636
client: this,
637+
signal: options?.signal,
625638
}
626639
);
627640
}
628641

629-
subscribeToBatch<TRunTypes extends AnyRunTypes>(batchId: string) {
642+
subscribeToBatch<TRunTypes extends AnyRunTypes>(
643+
batchId: string,
644+
options?: { signal?: AbortSignal }
645+
) {
630646
return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/batches/${batchId}`, {
631647
closeOnComplete: false,
632648
headers: this.#getRealtimeHeaders(),
633649
client: this,
650+
signal: options?.signal,
634651
});
635652
}
636653

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTy
4242
export type AnyRunShape = RunShape<AnyRunTypes>;
4343

4444
export type TaskRunShape<TTask extends AnyTask> = RunShape<InferRunTypes<TTask>>;
45+
export type RealtimeRun<TTask extends AnyTask> = TaskRunShape<TTask>;
4546

4647
export type RunStreamCallback<TRunTypes extends AnyRunTypes> = (
4748
run: RunShape<TRunTypes>
@@ -99,7 +100,7 @@ export interface StreamSubscription {
99100
}
100101

101102
export interface StreamSubscriptionFactory {
102-
createSubscription(runId: string, streamKey: string): StreamSubscription;
103+
createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription;
103104
}
104105

105106
// Real implementation for production
@@ -145,8 +146,8 @@ export class SSEStreamSubscriptionFactory implements StreamSubscriptionFactory {
145146
private options: { headers?: Record<string, string>; signal?: AbortSignal }
146147
) {}
147148

148-
createSubscription(runId: string, streamKey: string): StreamSubscription {
149-
const url = `${this.baseUrl}/realtime/v1/streams/${runId}/${streamKey}`;
149+
createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription {
150+
const url = `${baseUrl ?? this.baseUrl}/realtime/v1/streams/${runId}/${streamKey}`;
150151
return new SSEStreamSubscription(url, this.options);
151152
}
152153
}
@@ -243,7 +244,8 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
243244

244245
const subscription = this.options.streamFactory.createSubscription(
245246
run.id,
246-
streamKey.toString()
247+
streamKey.toString(),
248+
this.options.client?.baseUrl
247249
);
248250

249251
await subscription.subscribe(async (chunk) => {

packages/react-hooks/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
},
3939
"dependencies": {
4040
"@trigger.dev/core": "workspace:^3.1.2",
41-
"swr": "^2.2.5"
41+
"swr": "^2.2.5",
42+
"throttleit": "^2.1.0"
4243
},
4344
"devDependencies": {
4445
"@arethetypeswrong/cli": "^0.15.4",

packages/react-hooks/src/contexts.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import React from "react";
44
import { createContextAndHook } from "./utils/createContextAndHook.js";
55
import type { ApiClientConfiguration } from "@trigger.dev/core/v3";
66

7-
const [TriggerAuthContext, useTriggerAuthContext] =
7+
const [TriggerAuthContext, useTriggerAuthContext, useTriggerAuthContextOptional] =
88
createContextAndHook<ApiClientConfiguration>("TriggerAuthContext");
99

10-
export { TriggerAuthContext, useTriggerAuthContext };
10+
export { TriggerAuthContext, useTriggerAuthContext, useTriggerAuthContextOptional };
Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
"use client";
22

3-
import { ApiClient } from "@trigger.dev/core/v3";
4-
import { useTriggerAuthContext } from "../contexts.js";
3+
import { ApiClient, ApiRequestOptions } from "@trigger.dev/core/v3";
4+
import { useTriggerAuthContextOptional } from "../contexts.js";
55

6-
export function useApiClient() {
7-
const auth = useTriggerAuthContext();
6+
export type UseApiClientOptions = {
7+
accessToken?: string;
8+
baseURL?: string;
9+
requestOptions?: ApiRequestOptions;
10+
};
811

9-
const baseUrl = auth.baseURL ?? "https://api.trigger.dev";
12+
export function useApiClient(options?: UseApiClientOptions): ApiClient {
13+
const auth = useTriggerAuthContextOptional();
1014

11-
if (!auth.accessToken) {
12-
throw new Error("Missing accessToken in TriggerAuthContext");
15+
const baseUrl = auth?.baseURL ?? options?.baseURL ?? "https://api.trigger.dev";
16+
const accessToken = auth?.accessToken ?? options?.accessToken;
17+
18+
if (!accessToken) {
19+
throw new Error("Missing accessToken in TriggerAuthContext or useApiClient options");
1320
}
1421

15-
return new ApiClient(baseUrl, auth.accessToken, auth.requestOptions);
22+
const requestOptions: ApiRequestOptions = {
23+
...auth?.requestOptions,
24+
...options?.requestOptions,
25+
};
26+
27+
return new ApiClient(baseUrl, accessToken, requestOptions);
1628
}

0 commit comments

Comments
 (0)