Skip to content

Commit 9970b9b

Browse files
authored
Realtime streams now powered by electric (#1541)
* Realtime streams now powered by electric, and fix the streaming update duplicate issues by converting the electric Shape materialized view into a ReadableStream of changes * Ensure realtime subscription stops when runs are finished, and add an onComplete handle to use realtime hooks * Fix tests
1 parent b411313 commit 9970b9b

File tree

38 files changed

+1021
-465
lines changed

38 files changed

+1021
-465
lines changed

.changeset/lemon-cherries-greet.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/react-hooks": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Realtime streams now powered by electric. Also, this change fixes a realtime bug that was causing too many re-renders, even on records that didn't change

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ const EnvironmentSchema = z.object({
243243
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
244244
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
245245
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
246+
247+
REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
246248
});
247249

248250
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,9 @@ export class SpanPresenter extends BasePresenter {
215215
const span = await eventRepository.getSpan(spanId, run.traceId);
216216

217217
const metadata = run.metadata
218-
? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams"] })
218+
? await prettyPrintPacket(run.metadata, run.metadataType, {
219+
filteredKeys: ["$$streams", "$$streamsVersion"],
220+
})
219221
: undefined;
220222

221223
const context = {

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4-
import { realtimeStreams } from "~/services/realtimeStreamsGlobal.server";
4+
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
55
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
66

77
const ParamsSchema = z.object({
@@ -16,7 +16,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
1616
return new Response("No body provided", { status: 400 });
1717
}
1818

19-
return realtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
19+
return v1RealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
2020
}
2121

2222
export const loader = createLoaderApiRoute(
@@ -50,7 +50,13 @@ export const loader = createLoaderApiRoute(
5050
superScopes: ["read:runs", "read:all", "admin"],
5151
},
5252
},
53-
async ({ params, request, resource: run }) => {
54-
return realtimeStreams.streamResponse(run.friendlyId, params.streamId, request.signal);
53+
async ({ params, request, resource: run, authentication }) => {
54+
return v1RealtimeStreams.streamResponse(
55+
request,
56+
run.friendlyId,
57+
params.streamId,
58+
authentication.environment,
59+
request.signal
60+
);
5561
}
5662
);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { z } from "zod";
2+
import { $replica } from "~/db.server";
3+
import {
4+
createActionApiRoute,
5+
createLoaderApiRoute,
6+
} from "~/services/routeBuilders/apiBuilder.server";
7+
import { v2RealtimeStreams } from "~/services/realtime/v2StreamsGlobal.server";
8+
9+
const ParamsSchema = z.object({
10+
runId: z.string(),
11+
streamId: z.string(),
12+
});
13+
14+
const { action } = createActionApiRoute(
15+
{
16+
params: ParamsSchema,
17+
},
18+
async ({ request, params, authentication }) => {
19+
if (!request.body) {
20+
return new Response("No body provided", { status: 400 });
21+
}
22+
23+
const run = await $replica.taskRun.findFirst({
24+
where: {
25+
friendlyId: params.runId,
26+
runtimeEnvironmentId: authentication.environment.id,
27+
},
28+
include: {
29+
batch: {
30+
select: {
31+
friendlyId: true,
32+
},
33+
},
34+
},
35+
});
36+
37+
if (!run) {
38+
return new Response("Run not found", { status: 404 });
39+
}
40+
41+
return v2RealtimeStreams.ingestData(request.body, run.id, params.streamId);
42+
}
43+
);
44+
45+
export { action };
46+
47+
export const loader = createLoaderApiRoute(
48+
{
49+
params: ParamsSchema,
50+
allowJWT: true,
51+
corsStrategy: "all",
52+
findResource: async (params, auth) => {
53+
return $replica.taskRun.findFirst({
54+
where: {
55+
friendlyId: params.runId,
56+
runtimeEnvironmentId: auth.environment.id,
57+
},
58+
include: {
59+
batch: {
60+
select: {
61+
friendlyId: true,
62+
},
63+
},
64+
},
65+
});
66+
},
67+
authorization: {
68+
action: "read",
69+
resource: (run) => ({
70+
runs: run.friendlyId,
71+
tags: run.runTags,
72+
batch: run.batch?.friendlyId,
73+
tasks: run.taskIdentifier,
74+
}),
75+
superScopes: ["read:runs", "read:all", "admin"],
76+
},
77+
},
78+
async ({ params, request, resource: run, authentication }) => {
79+
return v2RealtimeStreams.streamResponse(
80+
request,
81+
run.id,
82+
params.streamId,
83+
authentication.environment,
84+
request.signal
85+
);
86+
}
87+
);
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { PrismaClient } from "@trigger.dev/database";
2+
import { AuthenticatedEnvironment } from "../apiAuth.server";
3+
import { logger } from "../logger.server";
4+
import { RealtimeClient } from "../realtimeClient.server";
5+
import { StreamIngestor, StreamResponder } from "./types";
6+
7+
export type DatabaseRealtimeStreamsOptions = {
8+
prisma: PrismaClient;
9+
realtimeClient: RealtimeClient;
10+
};
11+
12+
// Class implementing both interfaces
13+
export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder {
14+
constructor(private options: DatabaseRealtimeStreamsOptions) {}
15+
16+
async streamResponse(
17+
request: Request,
18+
runId: string,
19+
streamId: string,
20+
environment: AuthenticatedEnvironment,
21+
signal: AbortSignal
22+
): Promise<Response> {
23+
return this.options.realtimeClient.streamChunks(
24+
request.url,
25+
environment,
26+
runId,
27+
streamId,
28+
signal,
29+
request.headers.get("x-trigger-electric-version") ?? undefined
30+
);
31+
}
32+
33+
async ingestData(
34+
stream: ReadableStream<Uint8Array>,
35+
runId: string,
36+
streamId: string
37+
): Promise<Response> {
38+
try {
39+
const textStream = stream.pipeThrough(new TextDecoderStream());
40+
const reader = textStream.getReader();
41+
let sequence = 0;
42+
43+
while (true) {
44+
const { done, value } = await reader.read();
45+
46+
if (done) {
47+
break;
48+
}
49+
50+
logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
51+
streamId,
52+
runId,
53+
value,
54+
});
55+
56+
const chunks = value
57+
.split("\n")
58+
.filter((chunk) => chunk) // Remove empty lines
59+
.map((line) => {
60+
return {
61+
sequence: sequence++,
62+
value: line,
63+
};
64+
});
65+
66+
await this.options.prisma.realtimeStreamChunk.createMany({
67+
data: chunks.map((chunk) => {
68+
return {
69+
runId,
70+
key: streamId,
71+
sequence: chunk.sequence,
72+
value: chunk.value,
73+
};
74+
}),
75+
});
76+
}
77+
78+
return new Response(null, { status: 200 });
79+
} catch (error) {
80+
logger.error("[DatabaseRealtimeStreams][ingestData] Error in ingestData:", { error });
81+
82+
return new Response(null, { status: 500 });
83+
}
84+
}
85+
}

apps/webapp/app/services/realtimeStreams.server.ts renamed to apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
11
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
2-
import { logger } from "./logger.server";
2+
import { logger } from "../logger.server";
3+
import { StreamIngestor, StreamResponder } from "./types";
4+
import { AuthenticatedEnvironment } from "../apiAuth.server";
35

46
export type RealtimeStreamsOptions = {
57
redis: RedisOptions | undefined;
68
};
79

810
const END_SENTINEL = "<<CLOSE_STREAM>>";
911

10-
export class RealtimeStreams {
12+
// Class implementing both interfaces
13+
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
1114
constructor(private options: RealtimeStreamsOptions) {}
1215

13-
async streamResponse(runId: string, streamId: string, signal: AbortSignal): Promise<Response> {
16+
async streamResponse(
17+
request: Request,
18+
runId: string,
19+
streamId: string,
20+
environment: AuthenticatedEnvironment,
21+
signal: AbortSignal
22+
): Promise<Response> {
1423
const redis = new Redis(this.options.redis ?? {});
1524
const streamKey = `stream:${runId}:${streamId}`;
1625
let isCleanedUp = false;
@@ -115,11 +124,10 @@ export class RealtimeStreams {
115124
}
116125

117126
try {
118-
// Use TextDecoderStream to simplify text decoding
119127
const textStream = stream.pipeThrough(new TextDecoderStream());
120128
const reader = textStream.getReader();
121129

122-
const batchSize = 10; // Adjust this value based on performance testing
130+
const batchSize = 10;
123131
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];
124132

125133
while (true) {
@@ -131,17 +139,13 @@ export class RealtimeStreams {
131139

132140
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });
133141

134-
// 'value' is a string containing the decoded text
135142
const lines = value.split("\n");
136143

137144
for (const line of lines) {
138145
if (line.trim()) {
139-
// Avoid unnecessary parsing; assume 'line' is already a JSON string
140-
// Add XADD command with MAXLEN option to limit stream size
141146
batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]);
142147

143148
if (batchCommands.length >= batchSize) {
144-
// Send batch using a pipeline
145149
const pipeline = redis.pipeline();
146150
for (const args of batchCommands) {
147151
pipeline.xadd(...args);
@@ -153,7 +157,6 @@ export class RealtimeStreams {
153157
}
154158
}
155159

156-
// Send any remaining commands
157160
if (batchCommands.length > 0) {
158161
const pipeline = redis.pipeline();
159162
for (const args of batchCommands) {
@@ -162,7 +165,6 @@ export class RealtimeStreams {
162165
await pipeline.exec();
163166
}
164167

165-
// Send the __end message to indicate the end of the stream
166168
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);
167169

168170
return new Response(null, { status: 200 });
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { AuthenticatedEnvironment } from "../apiAuth.server";
2+
3+
// Interface for stream ingestion
4+
export interface StreamIngestor {
5+
ingestData(
6+
stream: ReadableStream<Uint8Array>,
7+
runId: string,
8+
streamId: string
9+
): Promise<Response>;
10+
}
11+
12+
// Interface for stream response
13+
export interface StreamResponder {
14+
streamResponse(
15+
request: Request,
16+
runId: string,
17+
streamId: string,
18+
environment: AuthenticatedEnvironment,
19+
signal: AbortSignal
20+
): Promise<Response>;
21+
}

apps/webapp/app/services/realtimeStreamsGlobal.server.ts renamed to apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { env } from "~/env.server";
22
import { singleton } from "~/utils/singleton";
3-
import { RealtimeStreams } from "./realtimeStreams.server";
3+
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";
44

5-
function initializeRealtimeStreams() {
6-
return new RealtimeStreams({
5+
function initializeRedisRealtimeStreams() {
6+
return new RedisRealtimeStreams({
77
redis: {
88
port: env.REDIS_PORT,
99
host: env.REDIS_HOST,
@@ -16,4 +16,4 @@ function initializeRealtimeStreams() {
1616
});
1717
}
1818

19-
export const realtimeStreams = singleton("realtimeStreams", initializeRealtimeStreams);
19+
export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { prisma } from "~/db.server";
2+
import { singleton } from "~/utils/singleton";
3+
import { realtimeClient } from "../realtimeClientGlobal.server";
4+
import { DatabaseRealtimeStreams } from "./databaseRealtimeStreams.server";
5+
6+
function initializeDatabaseRealtimeStreams() {
7+
return new DatabaseRealtimeStreams({
8+
prisma,
9+
realtimeClient,
10+
});
11+
}
12+
13+
export const v2RealtimeStreams = singleton("dbRealtimeStreams", initializeDatabaseRealtimeStreams);

0 commit comments

Comments
 (0)