Skip to content

feat/realtime-streams #1470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e05bf25
WIP realtime streams
ericallam Oct 31, 2024
d39d272
Handle realtime with large payloads or outputs #1451
ericallam Nov 1, 2024
17adacf
feat: optimize Redis stream handling with batching
ericallam Nov 1, 2024
f714893
🔧 chore: add dev dependencies for bundle analysis
ericallam Nov 1, 2024
ea84688
add metadata tests and a few more utilties
ericallam Nov 4, 2024
c784427
Add stream tests and improve streaming
ericallam Nov 11, 2024
7114ce4
Added AI tool tasks, descriptions to tasks
ericallam Nov 12, 2024
8e214d5
Use the config file path to determine the workingDir, then the packag…
ericallam Nov 12, 2024
8c7ee15
Remove stream test files
ericallam Nov 12, 2024
f1eaf5b
useTaskTrigger react hook that allows triggering a task from the client
ericallam Nov 13, 2024
a02ad73
Add streaming support for the realtime react hooks
ericallam Nov 14, 2024
f2245bd
Add ability to stream results after useTaskTrigger
ericallam Nov 14, 2024
b67443d
Improve the stream throttling
ericallam Nov 14, 2024
75fd83f
Use the runId as the ID key to bust the cache after triggering
ericallam Nov 14, 2024
54307b7
Upgrade to to the latest electric sql client and server
ericallam Nov 14, 2024
f08ea9b
Make realtime server backwards compat with 3.1.2 release
ericallam Nov 14, 2024
d8296b4
Pass the runId into useRealtimeRun
ericallam Nov 14, 2024
3d50d9c
Fix scopes when specifiying reading all runs
ericallam Nov 15, 2024
a4ff001
WIP @trigger.dev/rsc package
ericallam Nov 15, 2024
95fb46f
Various fixes and accepted recommendations by CodeRabbit
ericallam Nov 18, 2024
485ac32
Regenerate pnpm lock file
ericallam Nov 18, 2024
e192f73
A couple tweaks to rsc and give up on rendering react in tasks for now
ericallam Nov 18, 2024
227acc1
Add changeset
ericallam Nov 18, 2024
9e08987
Remove triggerRequest from the useEffect deps
ericallam Nov 18, 2024
f7875ae
Improve realtime & frontend authentication errors
ericallam Nov 19, 2024
50a2f73
Fixed authorization tests
ericallam Nov 19, 2024
feb6486
Remove unnecessary log
ericallam Nov 19, 2024
62a7598
Add metadata.stream limits and improve the metadata streams structure
ericallam Nov 19, 2024
686f47d
Streams can now have up to 2500 entries
ericallam Nov 19, 2024
8418498
Various coderabbit fixes
ericallam Nov 19, 2024
722bb45
additional react-hooks jsdocs
ericallam Nov 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/swift-glasses-mate.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@trigger.dev/react-hooks": patch
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/build": patch
"@trigger.dev/core": patch
"@trigger.dev/rsc": patch
---

Realtime streams
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@
"cwd": "${workspaceFolder}",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug realtimeStreams.test.ts",
"command": "pnpm run test -t RealtimeStreams",
"envFile": "${workspaceFolder}/.env",
"cwd": "${workspaceFolder}/apps/webapp",
"sourceMaps": true
},
{
"type": "chrome",
"request": "launch",
Expand All @@ -36,6 +45,14 @@
"cwd": "${workspaceFolder}/references/v3-catalog",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug Dev Next.js Realtime",
"command": "pnpm exec trigger dev",
"cwd": "${workspaceFolder}/references/nextjs-realtime",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const EnvironmentSchema = z.object({
LOGIN_ORIGIN: z.string().default("http://localhost:3030"),
APP_ORIGIN: z.string().default("http://localhost:3030"),
API_ORIGIN: z.string().optional(),
STREAM_ORIGIN: z.string().optional(),
ELECTRIC_ORIGIN: z.string().default("http://localhost:3060"),
APP_ENV: z.string().default(process.env.NODE_ENV),
SERVICE_NAME: z.string().default("trigger.dev webapp"),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export class SpanPresenter extends BasePresenter {
const span = await eventRepository.getSpan(spanId, run.traceId);

const metadata = run.metadata
? await prettyPrintPacket(run.metadata, run.metadataType)
? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams"] })
: undefined;

const context = {
Expand Down
42 changes: 21 additions & 21 deletions apps/webapp/app/routes/api.v1.packets.$.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { generatePresignedUrl } from "~/v3/r2.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -39,28 +40,27 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ presignedUrl });
}

export async function loader({ request, params }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);
export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
},
async ({ params, authentication }) => {
const filename = params["*"];

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}
const presignedUrl = await generatePresignedUrl(
authentication.environment.project.externalRef,
authentication.environment.slug,
filename,
"GET"
);

const parsedParams = ParamsSchema.parse(params);
const filename = parsedParams["*"];

const presignedUrl = await generatePresignedUrl(
authenticationResult.environment.project.externalRef,
authenticationResult.environment.slug,
filename,
"GET"
);
if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
}

if (!presignedUrl) {
return json({ error: "Failed to generate presigned URL" }, { status: 500 });
// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}

// Caller can now use this URL to fetch that object.
return json({ presignedUrl });
}
);
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.projects.$projectRef.runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
ApiRunListPresenter,
ApiRunListSearchParams,
} from "~/presenters/v3/ApiRunListPresenter.server";
import { createLoaderPATApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderPATApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
projectRef: z.string(),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
ApiRunListPresenter,
ApiRunListSearchParams,
} from "~/presenters/v3/ApiRunListPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const loader = createLoaderApiRoute(
{
Expand Down
216 changes: 112 additions & 104 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import { fromZodError } from "zod-validation-error";
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { generateJWT as internal_generateJWT, TriggerTaskRequestBody } from "@trigger.dev/core/v3";
import { TaskRun } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError, TriggerTaskService } from "~/v3/services/triggerTask.server";
import { startActiveSpan } from "~/v3/tracer.server";

const ParamsSchema = z.object({
taskId: z.string(),
Expand All @@ -20,115 +18,125 @@ export const HeadersSchema = z.object({
"trigger-version": z.string().nullish(),
"x-trigger-span-parent-as-link": z.coerce.number().nullish(),
"x-trigger-worker": z.string().nullish(),
"x-trigger-client": z.string().nullish(),
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

logger.debug("TriggerTask action", { headers: Object.fromEntries(request.headers) });

// Next authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const contentLength = request.headers.get("content-length");

if (!contentLength || parseInt(contentLength) > env.TASK_PAYLOAD_MAXIMUM_SIZE) {
return json({ error: "Request body too large" }, { status: 413 });
}
const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
params: ParamsSchema,
body: TriggerTaskRequestBody,
allowJWT: true,
maxContentLength: env.TASK_PAYLOAD_MAXIMUM_SIZE,
authorization: {
action: "write",
resource: (params) => ({ tasks: params.taskId }),
superScopes: ["write:tasks", "admin"],
},
corsStrategy: "all",
},
async ({ body, headers, params, authentication }) => {
const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
} = headers;

const service = new TriggerTaskService();

try {
const traceContext =
traceparent && isFromWorker /// If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

logger.debug("Triggering task", {
taskId: params.taskId,
idempotencyKey,
triggerVersion,
headers,
options: body.options,
isFromWorker,
traceContext,
});

const run = await service.call(params.taskId, authentication.environment, body, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
return json({ error: "Task not found" }, { status: 404 });
}

const rawHeaders = Object.fromEntries(request.headers);
const $responseHeaders = await responseHeaders(
run,
authentication.environment,
triggerClient
);

const headers = HeadersSchema.safeParse(rawHeaders);
return json(
{
id: run.friendlyId,
},
{
headers: $responseHeaders,
}
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 500 });
}

if (!headers.success) {
return json({ error: "Invalid headers" }, { status: 400 });
return json({ error: "Something went wrong" }, { status: 500 });
}
}

const {
"idempotency-key": idempotencyKey,
"trigger-version": triggerVersion,
"x-trigger-span-parent-as-link": spanParentAsLink,
traceparent,
tracestate,
"x-trigger-worker": isFromWorker,
} = headers.data;

const { taskId } = ParamsSchema.parse(params);

// Now parse the request body
const anyBody = await parseRequestJsonAsync(request, { taskId });

const body = await startActiveSpan("TriggerTaskRequestBody.safeParse()", async (span) => {
return TriggerTaskRequestBody.safeParse(anyBody);
);

async function responseHeaders(
run: TaskRun,
environment: AuthenticatedEnvironment,
triggerClient?: string | null
): Promise<Record<string, string>> {
const claimsHeader = JSON.stringify({
sub: environment.id,
pub: true,
});

if (!body.success) {
return json(
{ error: fromZodError(body.error, { prefix: "Invalid trigger call" }).toString() },
{ status: 400 }
);
}

const service = new TriggerTaskService();

try {
const traceContext =
traceparent && isFromWorker /// If the request is from a worker, we should pass the trace context
? { traceparent, tracestate }
: undefined;

logger.debug("Triggering task", {
taskId,
idempotencyKey,
triggerVersion,
headers: Object.fromEntries(request.headers),
options: body.data.options,
isFromWorker,
traceContext,
if (triggerClient === "browser") {
const claims = {
sub: environment.id,
pub: true,
scopes: [`read:runs:${run.friendlyId}`],
};

const jwt = await internal_generateJWT({
secretKey: environment.apiKey,
payload: claims,
expirationTime: "1h",
});

const run = await service.call(taskId, authenticationResult.environment, body.data, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
return json({ error: "Task not found" }, { status: 404 });
}

return json(
{
id: run.friendlyId,
},
{
headers: {
"x-trigger-jwt-claims": JSON.stringify({
sub: authenticationResult.environment.id,
pub: true,
}),
},
}
);
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof OutOfEntitlementError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 400 });
}

return json({ error: "Something went wrong" }, { status: 500 });
return {
"x-trigger-jwt-claims": claimsHeader,
"x-trigger-jwt": jwt,
};
}

return {
"x-trigger-jwt-claims": claimsHeader,
};
}

export { action, loader };
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuiilders/apiBuilder.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
runId: z.string(),
Expand Down
Loading
Loading