Skip to content

Commit 8418498

Browse files
committed
Various coderabbit fixes
1 parent 686f47d commit 8418498

File tree

7 files changed

+23
-105
lines changed

7 files changed

+23
-105
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ const { action, loader } = createActionApiRoute(
9797
} else if (error instanceof OutOfEntitlementError) {
9898
return json({ error: error.message }, { status: 422 });
9999
} else if (error instanceof Error) {
100-
return json({ error: error.message }, { status: 400 });
100+
return json({ error: error.message }, { status: 500 });
101101
}
102102

103103
return json({ error: "Something went wrong" }, { status: 500 });

apps/webapp/app/routes/realtime.v1.streams.test.ts

Lines changed: 0 additions & 43 deletions
This file was deleted.

apps/webapp/app/services/realtimeStreams.server.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,18 @@ export class RealtimeStreams {
5959
} catch (error) {
6060
if (signal.aborted) break;
6161

62-
console.error("Error reading from Redis stream:", error);
62+
logger.error("[RealtimeStreams][streamResponse] Error reading from Redis stream:", {
63+
error,
64+
});
6365
retryCount++;
6466
if (retryCount >= maxRetries) throw error;
6567
await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));
6668
}
6769
}
6870
} catch (error) {
69-
console.error("Fatal error in stream processing:", error);
71+
logger.error("[RealtimeStreams][streamResponse] Fatal error in stream processing:", {
72+
error,
73+
});
7074
controller.error(error);
7175
} finally {
7276
await cleanup();
@@ -163,7 +167,8 @@ export class RealtimeStreams {
163167

164168
return new Response(null, { status: 200 });
165169
} catch (error) {
166-
console.error("Error in ingestData:", error);
170+
logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error });
171+
167172
return new Response(null, { status: 500 });
168173
} finally {
169174
await cleanup();

apps/webapp/server.ts

Lines changed: 8 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import { RegistryProxy } from "~/v3/registryProxy.server";
1414

1515
const app = express();
1616

17-
// if (process.env.DISABLE_COMPRESSION !== "1") {
18-
// app.use(compression());
19-
// }
17+
if (process.env.DISABLE_COMPRESSION !== "1") {
18+
app.use(compression());
19+
}
2020

2121
// http://expressjs.com/en/advanced/best-practice-security.html#at-a-minimum-disable-x-powered-by-header
2222
app.disable("x-powered-by");
@@ -73,63 +73,15 @@ if (process.env.HTTP_SERVER_DISABLED !== "true") {
7373
next();
7474
});
7575

76-
app.post("/realtime/v1/streams/express/test", async (req, res) => {
77-
// Ensure the request is a readable stream
78-
const { method, headers } = req;
79-
console.log("Inside /realtime/v1/streams/express/test");
80-
81-
if (method !== "POST") {
82-
return res.status(405).send("Method Not Allowed");
83-
}
76+
app.use((req, res, next) => {
77+
// Generate a unique request ID for each request
78+
const requestId = nanoid();
8479

85-
// Set encoding to UTF-8 to read string data
86-
req.setEncoding("utf8");
87-
88-
let buffer = "";
89-
90-
try {
91-
req.on("data", (chunk) => {
92-
buffer += chunk;
93-
const lines = buffer.split("\n");
94-
buffer = lines.pop() || "";
95-
96-
for (const line of lines) {
97-
if (line.trim()) {
98-
const data = JSON.parse(line);
99-
console.log(`${new Date().toISOString()} Received data:`, data);
100-
// You can process each data chunk as needed
101-
}
102-
}
103-
});
104-
105-
req.on("end", () => {
106-
if (buffer) {
107-
const data = JSON.parse(buffer);
108-
console.log(`${new Date().toISOString()} Received data at end:`, data);
109-
// You can process the remaining data as needed
110-
}
111-
res.status(200).send(); // Send a success response
112-
});
113-
114-
req.on("error", (error) => {
115-
console.error("Error processing stream:", error);
116-
res.status(500).send("Internal Server Error");
117-
});
118-
} catch (error) {
119-
console.error("Error processing stream:", error);
120-
res.status(500).send("Internal Server Error");
121-
}
80+
runWithHttpContext({ requestId, path: req.url, host: req.hostname, method: req.method }, next);
12281
});
12382

124-
// app.use((req, res, next) => {
125-
// // Generate a unique request ID for each request
126-
// const requestId = nanoid();
127-
128-
// runWithHttpContext({ requestId, path: req.url, host: req.hostname, method: req.method }, next);
129-
// });
130-
13183
if (process.env.DASHBOARD_AND_API_DISABLED !== "true") {
132-
// app.use(apiRateLimiter);
84+
app.use(apiRateLimiter);
13385

13486
app.all(
13587
"*",

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,10 @@ export class SSEStreamSubscriptionFactory implements StreamSubscriptionFactory {
147147
) {}
148148

149149
createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription {
150+
if (!runId || !streamKey) {
151+
throw new Error("runId and streamKey are required");
152+
}
153+
150154
const url = `${baseUrl ?? this.baseUrl}/realtime/v1/streams/${runId}/${streamKey}`;
151155
return new SSEStreamSubscription(url, this.options);
152156
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ export class StandardMetadataManager implements RunMetadataManager {
260260
}
261261

262262
// Waits for all the streams to finish
263-
public async waitForAllStreams(timeout: number = 30_000): Promise<void> {
263+
public async waitForAllStreams(timeout: number = 60_000): Promise<void> {
264264
if (this.activeStreams.size === 0) {
265265
return;
266266
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ export class TaskExecutor {
503503
return this._tracer.startActiveSpan(
504504
"waitUntil",
505505
async (span) => {
506-
return await waitUntil.blockUntilSettled(30_000);
506+
return await waitUntil.blockUntilSettled(60_000);
507507
},
508508
{
509509
attributes: {

0 commit comments

Comments
 (0)