Skip to content

Commit 8342571

Browse files
committed
Make realtime server backwards compat with 3.1.2 release
1 parent 71eb5b9 commit 8342571

File tree

7 files changed

+240
-36
lines changed

7 files changed

+240
-36
lines changed

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ export const loader = createLoaderApiRoute(
3131
return json({ error: "Batch not found" }, { status: 404 });
3232
}
3333

34-
return realtimeClient.streamBatch(request.url, authentication.environment, batchRun.id);
34+
return realtimeClient.streamBatch(
35+
request.url,
36+
authentication.environment,
37+
batchRun.id,
38+
request.headers.get("x-trigger-electric-version") ?? undefined
39+
);
3540
}
3641
);

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ export const loader = createLoaderApiRoute(
3131
return json({ error: "Run not found" }, { status: 404 });
3232
}
3333

34-
return realtimeClient.streamRun(request.url, authentication.environment, run.id);
34+
return realtimeClient.streamRun(
35+
request.url,
36+
authentication.environment,
37+
run.id,
38+
request.headers.get("x-trigger-electric-version") ?? undefined
39+
);
3540
}
3641
);

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ export const loader = createLoaderApiRoute(
2323
},
2424
},
2525
async ({ searchParams, authentication, request }) => {
26-
return realtimeClient.streamRuns(request.url, authentication.environment, searchParams);
26+
return realtimeClient.streamRuns(
27+
request.url,
28+
authentication.environment,
29+
searchParams,
30+
request.headers.get("x-trigger-electric-version") ?? undefined
31+
);
2732
}
2833
);

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,29 @@ export class RealtimeClient {
3737
this.#registerCommands();
3838
}
3939

40-
async streamRun(url: URL | string, environment: RealtimeEnvironment, runId: string) {
41-
return this.#streamRunsWhere(url, environment, `id='${runId}'`);
40+
async streamRun(
41+
url: URL | string,
42+
environment: RealtimeEnvironment,
43+
runId: string,
44+
clientVersion?: string
45+
) {
46+
return this.#streamRunsWhere(url, environment, `id='${runId}'`, clientVersion);
4247
}
4348

44-
async streamBatch(url: URL | string, environment: RealtimeEnvironment, batchId: string) {
45-
return this.#streamRunsWhere(url, environment, `"batchId"='${batchId}'`);
49+
async streamBatch(
50+
url: URL | string,
51+
environment: RealtimeEnvironment,
52+
batchId: string,
53+
clientVersion?: string
54+
) {
55+
return this.#streamRunsWhere(url, environment, `"batchId"='${batchId}'`, clientVersion);
4656
}
4757

4858
async streamRuns(
4959
url: URL | string,
5060
environment: RealtimeEnvironment,
51-
params: RealtimeRunsParams
61+
params: RealtimeRunsParams,
62+
clientVersion?: string
5263
) {
5364
const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`];
5465

@@ -58,16 +69,21 @@ export class RealtimeClient {
5869

5970
const whereClause = whereClauses.join(" AND ");
6071

61-
return this.#streamRunsWhere(url, environment, whereClause);
72+
return this.#streamRunsWhere(url, environment, whereClause, clientVersion);
6273
}
6374

64-
async #streamRunsWhere(url: URL | string, environment: RealtimeEnvironment, whereClause: string) {
65-
const electricUrl = this.#constructElectricUrl(url, whereClause);
75+
async #streamRunsWhere(
76+
url: URL | string,
77+
environment: RealtimeEnvironment,
78+
whereClause: string,
79+
clientVersion?: string
80+
) {
81+
const electricUrl = this.#constructElectricUrl(url, whereClause, clientVersion);
6682

67-
return this.#performElectricRequest(electricUrl, environment);
83+
return this.#performElectricRequest(electricUrl, environment, clientVersion);
6884
}
6985

70-
#constructElectricUrl(url: URL | string, whereClause: string): URL {
86+
#constructElectricUrl(url: URL | string, whereClause: string, clientVersion?: string): URL {
7187
const $url = new URL(url.toString());
7288

7389
const electricUrl = new URL(`${this.options.electricOrigin}/v1/shape`);
@@ -77,36 +93,42 @@ export class RealtimeClient {
7793
electricUrl.searchParams.set(key, value);
7894
});
7995

80-
// const electricParams = ["shape_id", "live", "offset", "columns", "cursor"];
81-
82-
// electricParams.forEach((param) => {
83-
// if ($url.searchParams.has(param) && $url.searchParams.get(param)) {
84-
// electricUrl.searchParams.set(param, $url.searchParams.get(param)!);
85-
// }
86-
// });
87-
8896
electricUrl.searchParams.set("where", whereClause);
8997
electricUrl.searchParams.set("table", 'public."TaskRun"');
9098

99+
if (!clientVersion) {
100+
// If the client version is not provided, that means we're using an older client
101+
// This means the client will be sending shape_id instead of handle
102+
electricUrl.searchParams.set("handle", electricUrl.searchParams.get("shape_id") ?? "");
103+
}
104+
91105
return electricUrl;
92106
}
93107

94-
async #performElectricRequest(url: URL, environment: RealtimeEnvironment) {
108+
async #performElectricRequest(
109+
url: URL,
110+
environment: RealtimeEnvironment,
111+
clientVersion?: string
112+
) {
95113
const shapeId = extractShapeId(url);
96114

97115
logger.debug("[realtimeClient] request", {
98116
url: url.toString(),
99117
});
100118

119+
const rewriteResponseHeaders: Record<string, string> = clientVersion
120+
? {}
121+
: { "electric-handle": "electric-shape-id", "electric-offset": "electric-chunk-last-offset" };
122+
101123
if (!shapeId) {
102124
// If the shapeId is not present, we're just getting the initial value
103-
return longPollingFetch(url.toString());
125+
return longPollingFetch(url.toString(), {}, rewriteResponseHeaders);
104126
}
105127

106128
const isLive = isLiveRequestUrl(url);
107129

108130
if (!isLive) {
109-
return longPollingFetch(url.toString());
131+
return longPollingFetch(url.toString(), {}, rewriteResponseHeaders);
110132
}
111133

112134
const requestId = randomUUID();
@@ -148,7 +170,7 @@ export class RealtimeClient {
148170

149171
try {
150172
// ... (rest of your existing code for the long polling request)
151-
const response = await longPollingFetch(url.toString());
173+
const response = await longPollingFetch(url.toString(), {}, rewriteResponseHeaders);
152174

153175
// Decrement the counter after the long polling request is complete
154176
await this.#decrementConcurrency(environment.id, requestId);

apps/webapp/app/utils/longPollingFetch.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,44 @@
66
import { logger } from "~/services/logger.server";
77

88
// Similar-ish problem to https://github.com/wintercg/fetch/issues/23
9-
export async function longPollingFetch(url: string, options?: RequestInit) {
9+
export async function longPollingFetch(
10+
url: string,
11+
options?: RequestInit,
12+
rewriteResponseHeaders?: Record<string, string>
13+
) {
1014
try {
1115
let response = await fetch(url, options);
1216

1317
if (response.headers.get("content-encoding")) {
1418
const headers = new Headers(response.headers);
1519
headers.delete("content-encoding");
1620
headers.delete("content-length");
21+
22+
response = new Response(response.body, {
23+
headers,
24+
status: response.status,
25+
statusText: response.statusText,
26+
});
27+
}
28+
29+
if (rewriteResponseHeaders) {
30+
const headers = new Headers(response.headers);
31+
32+
for (const [fromKey, toKey] of Object.entries(rewriteResponseHeaders)) {
33+
const value = headers.get(fromKey);
34+
if (value) {
35+
headers.set(toKey, value);
36+
headers.delete(fromKey);
37+
}
38+
}
39+
1740
response = new Response(response.body, {
1841
headers,
1942
status: response.status,
2043
statusText: response.statusText,
2144
});
2245
}
46+
2347
return response;
2448
} catch (error) {
2549
if (error instanceof TypeError) {

0 commit comments

Comments
 (0)