Skip to content

Realtime streams now powered by electric #1541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/lemon-cherries-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/react-hooks": patch
"@trigger.dev/sdk": patch
Comment on lines +2 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Based on the evidence gathered, I can now provide a final response:

A minor version bump is warranted for these changes

The changes introduce significant new functionality and behavioral improvements:

  1. A new streaming backend powered by "electric"
  2. Fixes to the realtime functionality affecting render behavior
  3. Multiple files affected across the codebase (react-hooks, core, webapp) related to realtime/streaming functionality

These changes represent new features and significant improvements rather than just bug fixes, which aligns with semantic versioning principles for minor version bumps.

🔗 Analysis chain

Consider using minor version bumps instead of patches.

Given that this change introduces a new streaming backend ("electric") and fixes behavioral issues, it might be more appropriate to use minor version bumps instead of patches. Minor versions typically indicate new features or significant changes while maintaining backward compatibility.

Let's check the recent version history of these packages:


Let me gather more information about the changes to better assess the version bump type.


Let me search for changes related to the streaming backend and realtime functionality to better assess the impact.


Let me try a different search approach without specifying file types.


Let me try one final search with simpler commands.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check recent version history and commit messages for these packages

# Look for version-related changes in package.json files
rg -A 1 '"version":' packages/react-hooks/package.json packages/sdk/package.json

# Look for recent changesets
fd -e md . .changeset/ -x cat {}

Length of output: 1215


Script:

#!/bin/bash
# Look for changes related to the streaming backend and realtime features
rg -B2 -A2 "electric" --type ts --type tsx
rg -B2 -A2 "realtime" --type ts --type tsx

# Look for package.json files to check current versions
fd package.json -x cat {}

Length of output: 62603


Script:

#!/bin/bash
# Look for changes in source files related to streaming and realtime features
rg -B2 -A2 "electric|realtime|stream" --type typescript --type javascript

# Look for recent commits related to these changes
git log -n 5 --grep="electric\|realtime\|stream"

Length of output: 110


Script:

#!/bin/bash
# Search for electric and realtime related changes
rg -B2 -A2 "electric|realtime|stream" --type ts --type tsx --type js --type jsx

# Look for any recent changesets
fd -e md . .changeset/ -x grep -l "electric\|realtime\|stream" {} \; -exec cat {} \;

Length of output: 384


Script:

#!/bin/bash
# Look for the content of the changeset file
cat .changeset/lemon-cherries-greet.md

# Simple grep for electric/realtime in ts files
find . -type f -name "*.ts" -o -name "*.tsx" | xargs grep -l "electric\|realtime"

Length of output: 1951

---

Realtime streams now powered by electric. Also, this change fixes a realtime bug that was causing too many re-renders, even on records that didn't change
2 changes: 2 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ const EnvironmentSchema = z.object({
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ export class SpanPresenter extends BasePresenter {
const span = await eventRepository.getSpan(spanId, run.traceId);

const metadata = run.metadata
? await prettyPrintPacket(run.metadata, run.metadataType, { filteredKeys: ["$$streams"] })
? await prettyPrintPacket(run.metadata, run.metadataType, {
filteredKeys: ["$$streams", "$$streamsVersion"],
})
: undefined;

const context = {
Expand Down
14 changes: 10 additions & 4 deletions apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ActionFunctionArgs } from "@remix-run/server-runtime";
import { z } from "zod";
import { $replica } from "~/db.server";
import { realtimeStreams } from "~/services/realtimeStreamsGlobal.server";
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
Expand All @@ -16,7 +16,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
return new Response("No body provided", { status: 400 });
}

return realtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
return v1RealtimeStreams.ingestData(request.body, $params.runId, $params.streamId);
}

export const loader = createLoaderApiRoute(
Expand Down Expand Up @@ -50,7 +50,13 @@ export const loader = createLoaderApiRoute(
superScopes: ["read:runs", "read:all", "admin"],
},
},
async ({ params, request, resource: run }) => {
return realtimeStreams.streamResponse(run.friendlyId, params.streamId, request.signal);
async ({ params, request, resource: run, authentication }) => {
return v1RealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
authentication.environment,
request.signal
);
Comment on lines +54 to +60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use run.id instead of run.friendlyId for consistency

In the streamResponse call, use run.id to maintain consistency with other parts of the codebase and to ensure accurate identification of the run.

Apply this diff:

return v1RealtimeStreams.streamResponse(
  request,
- run.friendlyId,
+ run.id,
  params.streamId,
  authentication.environment,
  request.signal
);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return v1RealtimeStreams.streamResponse(
request,
run.friendlyId,
params.streamId,
authentication.environment,
request.signal
);
return v1RealtimeStreams.streamResponse(
request,
run.id,
params.streamId,
authentication.environment,
request.signal
);

}
);
87 changes: 87 additions & 0 deletions apps/webapp/app/routes/realtime.v2.streams.$runId.$streamId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { z } from "zod";
import { $replica } from "~/db.server";
import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
import { v2RealtimeStreams } from "~/services/realtime/v2StreamsGlobal.server";

const ParamsSchema = z.object({
runId: z.string(),
streamId: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
},
async ({ request, params, authentication }) => {
if (!request.body) {
return new Response("No body provided", { status: 400 });
}

const run = await $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: authentication.environment.id,
},
include: {
batch: {
select: {
friendlyId: true,
},
},
},
});

if (!run) {
return new Response("Run not found", { status: 404 });
}

return v2RealtimeStreams.ingestData(request.body, run.id, params.streamId);
}
);

export { action };

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
allowJWT: true,
corsStrategy: "all",
findResource: async (params, auth) => {
return $replica.taskRun.findFirst({
where: {
friendlyId: params.runId,
runtimeEnvironmentId: auth.environment.id,
},
include: {
batch: {
select: {
friendlyId: true,
},
},
},
});
},
Comment on lines +53 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Confirm that runTags and taskIdentifier are available in the authorization resource

The authorization resource accesses run.runTags and run.taskIdentifier, but these properties are not explicitly selected in the Prisma query. Ensure that these fields are included to prevent potential undefined values.

Apply this diff to include the necessary fields:

return $replica.taskRun.findFirst({
  where: {
    friendlyId: params.runId,
    runtimeEnvironmentId: auth.environment.id,
  },
  include: {
    batch: {
      select: {
        friendlyId: true,
      },
    },
  },
+ select: {
+   runTags: true,
+   taskIdentifier: true,
+ },
});

Committable suggestion skipped: line range outside the PR's diff.

authorization: {
action: "read",
resource: (run) => ({
runs: run.friendlyId,
tags: run.runTags,
batch: run.batch?.friendlyId,
tasks: run.taskIdentifier,
}),
superScopes: ["read:runs", "read:all", "admin"],
},
},
async ({ params, request, resource: run, authentication }) => {
return v2RealtimeStreams.streamResponse(
request,
run.id,
params.streamId,
authentication.environment,
request.signal
);
}
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { PrismaClient } from "@trigger.dev/database";
import { AuthenticatedEnvironment } from "../apiAuth.server";
import { logger } from "../logger.server";
import { RealtimeClient } from "../realtimeClient.server";
import { StreamIngestor, StreamResponder } from "./types";

export type DatabaseRealtimeStreamsOptions = {
prisma: PrismaClient;
realtimeClient: RealtimeClient;
};

// Class implementing both interfaces
export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder {
constructor(private options: DatabaseRealtimeStreamsOptions) {}

async streamResponse(
request: Request,
runId: string,
streamId: string,
environment: AuthenticatedEnvironment,
signal: AbortSignal
): Promise<Response> {
return this.options.realtimeClient.streamChunks(
request.url,
environment,
runId,
streamId,
signal,
request.headers.get("x-trigger-electric-version") ?? undefined
);
}

async ingestData(
stream: ReadableStream<Uint8Array>,
runId: string,
streamId: string
): Promise<Response> {
try {
const textStream = stream.pipeThrough(new TextDecoderStream());
const reader = textStream.getReader();
let sequence = 0;

while (true) {
const { done, value } = await reader.read();

if (done) {
break;
}

logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});

const chunks = value
.split("\n")
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});

await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
Comment on lines +42 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential data corruption due to incomplete stream chunks

When reading from the ReadableStream, the value may contain incomplete lines if a line is split across chunks. This can lead to data corruption when splitting the value by \n. Implement buffering of incomplete lines between reads to ensure data integrity.

Apply this diff to handle incomplete lines:

const reader = textStream.getReader();
+ let remainder = '';
let sequence = 0;

while (true) {
  const { done, value } = await reader.read();

  if (done) {
    break;
  }

- const chunks = value
+ remainder += value;
+ const lines = remainder.split('\n');
+ remainder = lines.pop() ?? '';
+ const chunks = lines
    .filter((chunk) => chunk) // Remove empty lines
    .map((line) => {
      return {
        sequence: sequence++,
        value: line,
      };
    });

  await this.options.prisma.realtimeStreamChunk.createMany({
    data: chunks.map((chunk) => {
      return {
        runId,
        key: streamId,
        sequence: chunk.sequence,
        value: chunk.value,
      };
    }),
  });
}

+ // Handle any remaining data after the loop
+ if (remainder) {
+   await this.options.prisma.realtimeStreamChunk.create({
+     data: {
+       runId,
+       key: streamId,
+       sequence: sequence++,
+       value: remainder,
+     },
+   });
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});
const chunks = value
.split("\n")
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});
await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
const reader = textStream.getReader();
let remainder = '';
let sequence = 0;
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
logger.debug("[DatabaseRealtimeStreams][ingestData] Reading data", {
streamId,
runId,
value,
});
remainder += value;
const lines = remainder.split('\n');
remainder = lines.pop() ?? '';
const chunks = lines
.filter((chunk) => chunk) // Remove empty lines
.map((line) => {
return {
sequence: sequence++,
value: line,
};
});
await this.options.prisma.realtimeStreamChunk.createMany({
data: chunks.map((chunk) => {
return {
runId,
key: streamId,
sequence: chunk.sequence,
value: chunk.value,
};
}),
});
}
// Handle any remaining data after the loop
if (remainder) {
await this.options.prisma.realtimeStreamChunk.create({
data: {
runId,
key: streamId,
sequence: sequence++,
value: remainder,
},
});
}


return new Response(null, { status: 200 });
} catch (error) {
logger.error("[DatabaseRealtimeStreams][ingestData] Error in ingestData:", { error });

return new Response(null, { status: 500 });
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
import Redis, { RedisKey, RedisOptions, RedisValue } from "ioredis";
import { logger } from "./logger.server";
import { logger } from "../logger.server";
import { StreamIngestor, StreamResponder } from "./types";
import { AuthenticatedEnvironment } from "../apiAuth.server";

export type RealtimeStreamsOptions = {
redis: RedisOptions | undefined;
};

const END_SENTINEL = "<<CLOSE_STREAM>>";

export class RealtimeStreams {
// Class implementing both interfaces
export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
constructor(private options: RealtimeStreamsOptions) {}

async streamResponse(runId: string, streamId: string, signal: AbortSignal): Promise<Response> {
async streamResponse(
request: Request,
runId: string,
streamId: string,
environment: AuthenticatedEnvironment,
signal: AbortSignal
): Promise<Response> {
const redis = new Redis(this.options.redis ?? {});
const streamKey = `stream:${runId}:${streamId}`;
let isCleanedUp = false;
Expand Down Expand Up @@ -115,11 +124,10 @@ export class RealtimeStreams {
}

try {
// Use TextDecoderStream to simplify text decoding
const textStream = stream.pipeThrough(new TextDecoderStream());
const reader = textStream.getReader();

const batchSize = 10; // Adjust this value based on performance testing
const batchSize = 10;
let batchCommands: Array<[key: RedisKey, ...args: RedisValue[]]> = [];

while (true) {
Expand All @@ -131,17 +139,13 @@ export class RealtimeStreams {

logger.debug("[RealtimeStreams][ingestData] Reading data", { streamKey, value });

// 'value' is a string containing the decoded text
const lines = value.split("\n");

for (const line of lines) {
if (line.trim()) {
// Avoid unnecessary parsing; assume 'line' is already a JSON string
// Add XADD command with MAXLEN option to limit stream size
batchCommands.push([streamKey, "MAXLEN", "~", "2500", "*", "data", line]);

if (batchCommands.length >= batchSize) {
// Send batch using a pipeline
const pipeline = redis.pipeline();
for (const args of batchCommands) {
pipeline.xadd(...args);
Expand All @@ -153,7 +157,6 @@ export class RealtimeStreams {
}
}

// Send any remaining commands
if (batchCommands.length > 0) {
const pipeline = redis.pipeline();
for (const args of batchCommands) {
Expand All @@ -162,7 +165,6 @@ export class RealtimeStreams {
await pipeline.exec();
}

// Send the __end message to indicate the end of the stream
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);

return new Response(null, { status: 200 });
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/services/realtime/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { AuthenticatedEnvironment } from "../apiAuth.server";

// Interface for stream ingestion
export interface StreamIngestor {
ingestData(
stream: ReadableStream<Uint8Array>,
runId: string,
streamId: string
): Promise<Response>;
}

// Interface for stream response
export interface StreamResponder {
streamResponse(
request: Request,
runId: string,
streamId: string,
environment: AuthenticatedEnvironment,
signal: AbortSignal
): Promise<Response>;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { env } from "~/env.server";
import { singleton } from "~/utils/singleton";
import { RealtimeStreams } from "./realtimeStreams.server";
import { RedisRealtimeStreams } from "./redisRealtimeStreams.server";

function initializeRealtimeStreams() {
return new RealtimeStreams({
function initializeRedisRealtimeStreams() {
return new RedisRealtimeStreams({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
Expand All @@ -16,4 +16,4 @@ function initializeRealtimeStreams() {
});
}

export const realtimeStreams = singleton("realtimeStreams", initializeRealtimeStreams);
export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update the singleton key to prevent naming collisions

The singleton is currently registered with the key "realtimeStreams", which could lead to collisions with other versions. Update the key to "v1RealtimeStreams" for clarity and to avoid potential conflicts.

Apply this diff:

-export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
+export const v1RealtimeStreams = singleton("v1RealtimeStreams", initializeRedisRealtimeStreams);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const v1RealtimeStreams = singleton("realtimeStreams", initializeRedisRealtimeStreams);
export const v1RealtimeStreams = singleton("v1RealtimeStreams", initializeRedisRealtimeStreams);

13 changes: 13 additions & 0 deletions apps/webapp/app/services/realtime/v2StreamsGlobal.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { prisma } from "~/db.server";
import { singleton } from "~/utils/singleton";
import { realtimeClient } from "../realtimeClientGlobal.server";
import { DatabaseRealtimeStreams } from "./databaseRealtimeStreams.server";

function initializeDatabaseRealtimeStreams() {
return new DatabaseRealtimeStreams({
prisma,
realtimeClient,
});
}

export const v2RealtimeStreams = singleton("dbRealtimeStreams", initializeDatabaseRealtimeStreams);
Loading
Loading