Skip to content

Commit 34f8bd5

Browse files
authored
Add ability to update parent and root run metadata from children (#1563)
* Increase the number of active streams from 2 to 5 * WIP parent metadata updates * Fix noop metadata manager * Implement run metadata updates from ancestor tasks * Add changeset * Add ability to stream into parent and root task runs * Don't duplicate exporting run metadata types * Add ability to fetch streams through metadata * Couple of fixes from CodeRabbit * Fix metadata tests * Make sure streams are subscribed in the "background" * Move the stream subscription stuff to the API client, expose it through `runs.fetchStream` * Fixed run stream tests
1 parent 4243cb2 commit 34f8bd5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1974
-532
lines changed

.changeset/giant-mice-cheer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
Increase the number of active streams from 2 to 5, total streams from 5 to 10

.changeset/slow-deers-collect.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/sdk": patch
4+
"trigger.dev": patch
5+
"@trigger.dev/core": patch
6+
---
7+
8+
Adding ability to update parent run metadata from child runs/tasks

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,14 @@ const EnvironmentSchema = z.object({
253253
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
254254
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
255255
BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB
256-
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB
256+
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB
257257

258258
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
259259
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
260260
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
261261

262262
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
263+
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
263264
});
264265

265266
export type Environment = z.infer<typeof EnvironmentSchema>;
Lines changed: 19 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,29 @@
1-
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2-
import { parsePacket, UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
1+
import { json } from "@remix-run/server-runtime";
2+
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
33
import { z } from "zod";
4-
import { prisma } from "~/db.server";
5-
import { authenticateApiRequest } from "~/services/apiAuth.server";
6-
import { handleMetadataPacket } from "~/utils/packets";
7-
import { ServiceValidationError } from "~/v3/services/baseService.server";
8-
import { isFinalRunStatus } from "~/v3/taskStatus";
4+
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
5+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
96

107
const ParamsSchema = z.object({
118
runId: z.string(),
129
});
1310

14-
export async function action({ request, params }: ActionFunctionArgs) {
15-
// Ensure this is a PUT request
16-
if (request.method.toUpperCase() !== "PUT") {
17-
return json({ error: "Method not allowed" }, { status: 405, headers: { Allow: "PUT" } });
18-
}
19-
20-
// Authenticate the request
21-
const authenticationResult = await authenticateApiRequest(request);
22-
if (!authenticationResult) {
23-
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
24-
}
25-
26-
const parsedParams = ParamsSchema.safeParse(params);
27-
if (!parsedParams.success) {
28-
return json(
29-
{ error: "Invalid request parameters", issues: parsedParams.error.issues },
30-
{ status: 400 }
31-
);
32-
}
33-
34-
try {
35-
const anyBody = await request.json();
36-
37-
const body = UpdateMetadataRequestBody.safeParse(anyBody);
38-
39-
if (!body.success) {
40-
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
41-
}
42-
43-
const metadataPacket = handleMetadataPacket(
44-
body.data.metadata,
45-
body.data.metadataType ?? "application/json"
46-
);
47-
48-
if (!metadataPacket) {
49-
return json({ error: "Invalid metadata" }, { status: 400 });
50-
}
51-
52-
const taskRun = await prisma.taskRun.findFirst({
53-
where: {
54-
friendlyId: parsedParams.data.runId,
55-
runtimeEnvironmentId: authenticationResult.environment.id,
56-
},
57-
select: {
58-
status: true,
59-
},
60-
});
61-
62-
if (!taskRun) {
11+
const { action } = createActionApiRoute(
12+
{
13+
params: ParamsSchema,
14+
body: UpdateMetadataRequestBody,
15+
maxContentLength: 1024 * 1024, // 1MB
16+
method: "PUT",
17+
},
18+
async ({ authentication, body, params }) => {
19+
const result = await updateMetadataService.call(authentication.environment, params.runId, body);
20+
21+
if (!result) {
6322
return json({ error: "Task Run not found" }, { status: 404 });
6423
}
6524

66-
if (isFinalRunStatus(taskRun.status)) {
67-
return json({ error: "Cannot update metadata for a completed run" }, { status: 400 });
68-
}
69-
70-
await prisma.taskRun.update({
71-
where: {
72-
friendlyId: parsedParams.data.runId,
73-
runtimeEnvironmentId: authenticationResult.environment.id,
74-
},
75-
data: {
76-
metadata: metadataPacket?.data,
77-
metadataType: metadataPacket?.dataType,
78-
},
79-
});
80-
81-
const parsedPacket = await parsePacket(metadataPacket);
82-
83-
return json({ metadata: parsedPacket }, { status: 200 });
84-
} catch (error) {
85-
if (error instanceof ServiceValidationError) {
86-
return json({ error: error.message }, { status: error.status ?? 422 });
87-
} else {
88-
return json(
89-
{ error: error instanceof Error ? error.message : "Internal Server Error" },
90-
{ status: 500 }
91-
);
92-
}
25+
return json(result, { status: 200 });
9326
}
94-
}
27+
);
28+
29+
export { action };

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
import { json } from "@remix-run/server-runtime";
22
import {
3-
BatchTriggerTaskResponse,
43
BatchTriggerTaskV2RequestBody,
54
BatchTriggerTaskV2Response,
65
generateJWT,
76
} from "@trigger.dev/core/v3";
87
import { env } from "~/env.server";
8+
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
9+
import { logger } from "~/services/logger.server";
910
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
10-
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1111
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
12+
import { ServiceValidationError } from "~/v3/services/baseService.server";
1213
import {
1314
BatchProcessingStrategy,
1415
BatchTriggerV2Service,
1516
} from "~/v3/services/batchTriggerV2.server";
16-
import { ServiceValidationError } from "~/v3/services/baseService.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
18-
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
19-
import { logger } from "~/services/logger.server";
20-
import { z } from "zod";
18+
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
2119

2220
const { action, loader } = createActionApiRoute(
2321
{

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
5-
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
65
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
76

87
const ParamsSchema = z.object({
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { z } from "zod";
2+
import { $replica } from "~/db.server";
3+
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
4+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
6+
const ParamsSchema = z.object({
7+
runId: z.string(),
8+
target: z.enum(["self", "parent", "root"]),
9+
streamId: z.string(),
10+
});
11+
12+
const { action } = createActionApiRoute(
13+
{
14+
params: ParamsSchema,
15+
},
16+
async ({ request, params, authentication }) => {
17+
if (!request.body) {
18+
return new Response("No body provided", { status: 400 });
19+
}
20+
21+
const run = await $replica.taskRun.findFirst({
22+
where: {
23+
friendlyId: params.runId,
24+
runtimeEnvironmentId: authentication.environment.id,
25+
},
26+
select: {
27+
id: true,
28+
friendlyId: true,
29+
parentTaskRun: {
30+
select: {
31+
friendlyId: true,
32+
},
33+
},
34+
rootTaskRun: {
35+
select: {
36+
friendlyId: true,
37+
},
38+
},
39+
},
40+
});
41+
42+
if (!run) {
43+
return new Response("Run not found", { status: 404 });
44+
}
45+
46+
const targetId =
47+
params.target === "self"
48+
? run.friendlyId
49+
: params.target === "parent"
50+
? run.parentTaskRun?.friendlyId
51+
: run.rootTaskRun?.friendlyId;
52+
53+
if (!targetId) {
54+
return new Response("Target not found", { status: 404 });
55+
}
56+
57+
return relayRealtimeStreams.ingestData(request.body, targetId, params.streamId);
58+
}
59+
);
60+
61+
export { action };

0 commit comments

Comments
 (0)