Skip to content

Commit c04b552

Browse files
committed
Improve the v1 realtime streams (Redis)
1 parent d4f533f commit c04b552

File tree

13 files changed

+339
-74
lines changed

13 files changed

+339
-74
lines changed

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,10 +224,15 @@ export class ApiRunListPresenter extends BasePresenter {
224224

225225
const data: ListRunResponseItem[] = await Promise.all(
226226
results.runs.map(async (run) => {
227-
const metadata = await parsePacket({
228-
data: run.metadata ?? undefined,
229-
dataType: run.metadataType,
230-
});
227+
const metadata = await parsePacket(
228+
{
229+
data: run.metadata ?? undefined,
230+
dataType: run.metadataType,
231+
},
232+
{
233+
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
234+
}
235+
);
231236

232237
return {
233238
id: run.friendlyId,

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ export class SpanPresenter extends BasePresenter {
216216

217217
const metadata = run.metadata
218218
? await prettyPrintPacket(run.metadata, run.metadataType, {
219-
filteredKeys: ["$$streams", "$$streamsVersion"],
219+
filteredKeys: ["$$streams", "$$streamsVersion", "$$streamsBaseUrl"],
220220
})
221221
: undefined;
222222

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
1+
import Redis, { RedisOptions } from "ioredis";
2+
import { AuthenticatedEnvironment } from "../apiAuth.server";
23
import { logger } from "../logger.server";
34
import { StreamIngestor, StreamResponder } from "./types";
4-
import { AuthenticatedEnvironment } from "../apiAuth.server";
5+
import { LineTransformStream } from "./utils.server";
56

67
export type RealtimeStreamsOptions = {
78
redis: RedisOptions | undefined;
@@ -56,7 +57,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
5657
controller.close();
5758
return;
5859
}
59-
controller.enqueue(`data: ${fields[1]}\n\n`);
60+
controller.enqueue(fields[1]);
6061

6162
if (signal.aborted) {
6263
controller.close();
@@ -88,7 +89,18 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
8889
cancel: async () => {
8990
await cleanup();
9091
},
91-
});
92+
})
93+
.pipeThrough(new LineTransformStream())
94+
.pipeThrough(
95+
new TransformStream({
96+
transform(chunk, controller) {
97+
for (const line of chunk) {
98+
controller.enqueue(`data: ${line}\n\n`);
99+
}
100+
},
101+
})
102+
)
103+
.pipeThrough(new TextEncoderStream());
92104

93105
async function cleanup() {
94106
if (isCleanedUp) return;
@@ -98,7 +110,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
98110

99111
signal.addEventListener("abort", cleanup);
100112

101-
return new Response(stream.pipeThrough(new TextEncoderStream()), {
113+
return new Response(stream, {
102114
headers: {
103115
"Content-Type": "text/event-stream",
104116
"Cache-Control": "no-cache",
@@ -119,50 +131,28 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
119131
try {
120132
await redis.quit();
121133
} catch (error) {
122-
logger.error("[RealtimeStreams][ingestData] Error in cleanup:", { error });
134+
logger.error("[RedisRealtimeStreams][ingestData] Error in cleanup:", { error });
123135
}
124136
}
125137

126138
try {
127139
const textStream = stream.pipeThrough(new TextDecoderStream());
128140
const reader = textStream.getReader();
129141

130-
const batchSize = 10;
131-
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];
132-
133142
while (true) {
134143
const { done, value } = await reader.read();
135144

136-
if (done) {
145+
if (done || !value) {
137146
break;
138147
}
139148

140-
logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });
141-
142-
const lines = value.split("\n");
143-
144-
for (const line of lines) {
145-
if (line.trim()) {
146-
batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]);
149+
logger.debug("[RedisRealtimeStreams][ingestData] Reading data", {
150+
streamKey,
151+
runId,
152+
value,
153+
});
147154

148-
if (batchCommands.length >= batchSize) {
149-
const pipeline = redis.pipeline();
150-
for (const args of batchCommands) {
151-
pipeline.xadd(...args);
152-
}
153-
await pipeline.exec();
154-
batchCommands = [];
155-
}
156-
}
157-
}
158-
}
159-
160-
if (batchCommands.length > 0) {
161-
const pipeline = redis.pipeline();
162-
for (const args of batchCommands) {
163-
pipeline.xadd(...args);
164-
}
165-
await pipeline.exec();
155+
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value);
166156
}
167157

168158
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import { AuthenticatedEnvironment } from "../apiAuth.server";
2+
import { logger } from "../logger.server";
3+
import { StreamIngestor, StreamResponder } from "./types";
4+
import { LineTransformStream } from "./utils.server";
5+
import { v1RealtimeStreams } from "./v1StreamsGlobal.server";
6+
import { singleton } from "~/utils/singleton";
7+
8+
export type RelayRealtimeStreamsOptions = {
9+
ttl: number;
10+
fallbackIngestor: StreamIngestor;
11+
fallbackResponder: StreamResponder;
12+
waitForBufferTimeout?: number; // Time to wait for buffer in ms (default: 500ms)
13+
waitForBufferInterval?: number; // Polling interval in ms (default: 50ms)
14+
};
15+
16+
interface RelayedStreamRecord {
17+
stream: ReadableStream<Uint8Array>;
18+
createdAt: number;
19+
lastAccessed: number;
20+
finalized: boolean;
21+
}
22+
23+
export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
24+
private _buffers: Map<string, RelayedStreamRecord> = new Map();
25+
private cleanupInterval: NodeJS.Timeout;
26+
private waitForBufferTimeout: number;
27+
private waitForBufferInterval: number;
28+
29+
constructor(private options: RelayRealtimeStreamsOptions) {
30+
this.waitForBufferTimeout = options.waitForBufferTimeout ?? 5000;
31+
this.waitForBufferInterval = options.waitForBufferInterval ?? 50;
32+
33+
// Periodic cleanup
34+
this.cleanupInterval = setInterval(() => {
35+
this.cleanup();
36+
}, this.options.ttl).unref();
37+
}
38+
39+
async streamResponse(
40+
request: Request,
41+
runId: string,
42+
streamId: string,
43+
environment: AuthenticatedEnvironment,
44+
signal: AbortSignal
45+
): Promise<Response> {
46+
let record = this._buffers.get(`${runId}:${streamId}`);
47+
48+
if (!record) {
49+
logger.debug(
50+
"[RelayRealtimeStreams][streamResponse] No ephemeral record found, waiting to see if one becomes available",
51+
{
52+
streamId,
53+
runId,
54+
}
55+
);
56+
57+
record = await this.waitForBuffer(`${runId}:${streamId}`);
58+
59+
if (!record) {
60+
logger.debug(
61+
"[RelayRealtimeStreams][streamResponse] No ephemeral record found, using fallback",
62+
{
63+
streamId,
64+
runId,
65+
}
66+
);
67+
68+
// No ephemeral record, use fallback
69+
return this.options.fallbackResponder.streamResponse(
70+
request,
71+
runId,
72+
streamId,
73+
environment,
74+
signal
75+
);
76+
}
77+
}
78+
79+
record.lastAccessed = Date.now();
80+
81+
logger.debug("[RelayRealtimeStreams][streamResponse] Streaming from ephemeral record", {
82+
streamId,
83+
runId,
84+
});
85+
86+
// Create a streaming response from the buffered data
87+
const stream = record.stream
88+
.pipeThrough(new TextDecoderStream())
89+
.pipeThrough(new LineTransformStream())
90+
.pipeThrough(
91+
new TransformStream({
92+
transform(chunk, controller) {
93+
for (const line of chunk) {
94+
controller.enqueue(`data: ${line}\n\n`);
95+
}
96+
},
97+
})
98+
)
99+
.pipeThrough(new TextEncoderStream());
100+
101+
// Once we start streaming, consider deleting the buffer when done.
102+
// For a simple approach, we can rely on finalized and no more reads.
103+
// Or we can let TTL cleanup handle it if multiple readers might come in.
104+
return new Response(stream, {
105+
headers: {
106+
"Content-Type": "text/event-stream",
107+
"Cache-Control": "no-cache",
108+
Connection: "keep-alive",
109+
},
110+
});
111+
}
112+
113+
async ingestData(
114+
stream: ReadableStream<Uint8Array>,
115+
runId: string,
116+
streamId: string
117+
): Promise<Response> {
118+
const [localStream, fallbackStream] = stream.tee();
119+
120+
logger.debug("[RelayRealtimeStreams][ingestData] Ingesting data", { runId, streamId });
121+
122+
// Handle local buffering asynchronously and catch errors
123+
this.handleLocalIngestion(localStream, runId, streamId).catch((err) => {
124+
logger.error("[RelayRealtimeStreams][ingestData] Error in local ingestion:", { err });
125+
});
126+
127+
// Forward to the fallback ingestor asynchronously and catch errors
128+
return this.options.fallbackIngestor.ingestData(fallbackStream, runId, streamId);
129+
}
130+
131+
/**
132+
* Handles local buffering of the stream data.
133+
* @param stream The readable stream to buffer.
134+
* @param streamId The unique identifier for the stream.
135+
*/
136+
private async handleLocalIngestion(
137+
stream: ReadableStream<Uint8Array>,
138+
runId: string,
139+
streamId: string
140+
) {
141+
this.createOrUpdateRelayedStream(`${runId}:${streamId}`, stream);
142+
}
143+
144+
/**
145+
* Retrieves an existing buffer or creates a new one for the given streamId.
146+
* @param streamId The unique identifier for the stream.
147+
*/
148+
private createOrUpdateRelayedStream(
149+
bufferKey: string,
150+
stream: ReadableStream<Uint8Array>
151+
): RelayedStreamRecord {
152+
let record = this._buffers.get(bufferKey);
153+
if (!record) {
154+
record = {
155+
stream,
156+
createdAt: Date.now(),
157+
lastAccessed: Date.now(),
158+
finalized: false,
159+
};
160+
this._buffers.set(bufferKey, record);
161+
} else {
162+
record.lastAccessed = Date.now();
163+
}
164+
return record;
165+
}
166+
167+
private cleanup() {
168+
const now = Date.now();
169+
for (const [key, record] of this._buffers.entries()) {
170+
// If last accessed is older than ttl, clean up
171+
if (now - record.lastAccessed > this.options.ttl) {
172+
this.deleteBuffer(key);
173+
}
174+
}
175+
}
176+
177+
private deleteBuffer(bufferKey: string) {
178+
this._buffers.delete(bufferKey);
179+
}
180+
181+
/**
182+
* Waits for a buffer to be created within a specified timeout.
183+
* @param streamId The unique identifier for the stream.
184+
* @returns A promise that resolves to true if the buffer was created, false otherwise.
185+
*/
186+
private async waitForBuffer(bufferKey: string): Promise<RelayedStreamRecord | undefined> {
187+
const timeout = this.waitForBufferTimeout;
188+
const interval = this.waitForBufferInterval;
189+
const maxAttempts = Math.ceil(timeout / interval);
190+
let attempts = 0;
191+
192+
return new Promise<RelayedStreamRecord | undefined>((resolve) => {
193+
const checkBuffer = () => {
194+
attempts++;
195+
if (this._buffers.has(bufferKey)) {
196+
resolve(this._buffers.get(bufferKey));
197+
return;
198+
}
199+
if (attempts >= maxAttempts) {
200+
resolve(undefined);
201+
return;
202+
}
203+
setTimeout(checkBuffer, interval);
204+
};
205+
checkBuffer();
206+
});
207+
}
208+
209+
// Don't forget to clear interval on shutdown if needed
210+
close() {
211+
clearInterval(this.cleanupInterval);
212+
}
213+
}
214+
215+
function initializeRelayRealtimeStreams() {
216+
return new RelayRealtimeStreams({
217+
ttl: 1000 * 60 * 5, // 5 minutes
218+
fallbackIngestor: v1RealtimeStreams,
219+
fallbackResponder: v1RealtimeStreams,
220+
});
221+
}
222+
223+
export const relayRealtimeStreams = singleton(
224+
"relayRealtimeStreams",
225+
initializeRelayRealtimeStreams
226+
);
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
export class LineTransformStream extends TransformStream<string, string[]> {
2+
private buffer = "";
3+
4+
constructor() {
5+
super({
6+
transform: (chunk, controller) => {
7+
// Append the chunk to the buffer
8+
this.buffer += chunk;
9+
10+
// Split on newlines
11+
const lines = this.buffer.split("\n");
12+
13+
// The last element might be incomplete, hold it back in buffer
14+
this.buffer = lines.pop() || "";
15+
16+
// Filter out empty or whitespace-only lines
17+
const fullLines = lines.filter((line) => line.trim().length > 0);
18+
19+
// If we got any complete lines, emit them as an array
20+
if (fullLines.length > 0) {
21+
controller.enqueue(fullLines);
22+
}
23+
},
24+
flush: (controller) => {
25+
// On stream end, if there's leftover text, emit it as a single-element array
26+
const trimmed = this.buffer.trim();
27+
if (trimmed.length > 0) {
28+
controller.enqueue([trimmed]);
29+
}
30+
},
31+
});
32+
}
33+
}

0 commit comments

Comments
 (0)