Skip to content

Allow creating and monitoring run replication services with different settings #2055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000),
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
122 changes: 122 additions & 0 deletions apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { z } from "zod";
import { ClickHouse } from "@internal/clickhouse";
import { env } from "~/env.server";
import { RunsReplicationService } from "~/services/runsReplicationService.server";
import {
getRunsReplicationGlobal,
setRunsReplicationGlobal,
} from "~/services/runsReplicationGlobal.server";

const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number(),
maxOpenConnections: z.number(),
maxFlushConcurrency: z.number(),
flushIntervalMs: z.number(),
flushBatchSize: z.number(),
leaderLockTimeoutMs: z.number(),
leaderLockExtendIntervalMs: z.number(),
leaderLockAcquireAdditionalTimeMs: z.number(),
leaderLockRetryIntervalMs: z.number(),
ackIntervalSeconds: z.number(),
waitForAsyncInsert: z.boolean(),
});
Comment on lines +13 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Tighten parameter validation to reject nonsensical or dangerous input

Most numeric parameters (intervals, counts, batch sizes, TTL, etc.) must be positive integers.
z.number() currently accepts NaN, fractions, and negative values (as well as numbers far larger than what the service can realistically handle).

 const CreateRunReplicationServiceParams = z.object({
   name: z.string(),
-  keepAliveEnabled: z.boolean(),
-  keepAliveIdleSocketTtl: z.number(),
-  maxOpenConnections: z.number(),
-  maxFlushConcurrency: z.number(),
-  flushIntervalMs: z.number(),
-  flushBatchSize: z.number(),
-  leaderLockTimeoutMs: z.number(),
-  leaderLockExtendIntervalMs: z.number(),
-  leaderLockAcquireAdditionalTimeMs: z.number(),
-  leaderLockRetryIntervalMs: z.number(),
-  ackIntervalSeconds: z.number(),
+  keepAliveEnabled: z.boolean(),
+  keepAliveIdleSocketTtl: z.number().int().positive(),
+  maxOpenConnections: z.number().int().positive(),
+  maxFlushConcurrency: z.number().int().positive(),
+  flushIntervalMs: z.number().int().positive(),
+  flushBatchSize: z.number().int().positive(),
+  leaderLockTimeoutMs: z.number().int().positive(),
+  leaderLockExtendIntervalMs: z.number().int().positive(),
+  leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(),
+  leaderLockRetryIntervalMs: z.number().int().positive(),
+  ackIntervalSeconds: z.number().int().positive(),
   waitForAsyncInsert: z.boolean(),
 });

Rejecting invalid values early prevents runaway timers, mis-configured pools, and DoS vectors.


🏁 Script executed:

#!/bin/bash
set -e
echo "Searching for CreateRunReplicationServiceParams definition and usage:"
rg -n "CreateRunReplicationServiceParams" apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts || true

echo
echo "Showing contents of the route file:"
sed -n '1,200p' apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts || true

echo
echo "Checking for existing z.number().int() usage across the repo:"
rg -n "z\.number\(\)\.int" -A3 -B1 || true

Length of output: 12923


Enforce strict integer and positivity constraints on replication parameters

All numeric fields in CreateRunReplicationServiceParams currently use z.number(), which permits negatives, fractions, and NaN. To prevent mis-configurations, resource exhaustion, and DoS scenarios, these should be constrained to integers—and where applicable, strictly positive or non-negative.

In apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts, update the schema as follows:

 const CreateRunReplicationServiceParams = z.object({
   name: z.string(),
   keepAliveEnabled: z.boolean(),
-  keepAliveIdleSocketTtl: z.number(),
+  keepAliveIdleSocketTtl: z.number().int().nonnegative(),        // ms; ≥ 0
-  maxOpenConnections: z.number(),
+  maxOpenConnections: z.number().int().positive(),             // ≥ 1
-  maxFlushConcurrency: z.number(),
+  maxFlushConcurrency: z.number().int().positive(),            // ≥ 1
-  flushIntervalMs: z.number(),
+  flushIntervalMs: z.number().int().positive(),                // ms; ≥ 1
-  flushBatchSize: z.number(),
+  flushBatchSize: z.number().int().positive(),                 // ≥ 1
-  leaderLockTimeoutMs: z.number(),
+  leaderLockTimeoutMs: z.number().int().positive(),            // ms; ≥ 1
-  leaderLockExtendIntervalMs: z.number(),
+  leaderLockExtendIntervalMs: z.number().int().positive(),     // ms; ≥ 1
-  leaderLockAcquireAdditionalTimeMs: z.number(),
+  leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(), // ms; ≥ 0
-  leaderLockRetryIntervalMs: z.number(),
+  leaderLockRetryIntervalMs: z.number().int().positive(),       // ms; ≥ 1
-  ackIntervalSeconds: z.number(),
+  ackIntervalSeconds: z.number().int().positive(),             // s; ≥ 1
   waitForAsyncInsert: z.boolean(),
 });

This aligns with existing .int() usage elsewhere (e.g., batchTrigger and ClickHouse schemas) and ensures invalid values are rejected at parse time.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number(),
maxOpenConnections: z.number(),
maxFlushConcurrency: z.number(),
flushIntervalMs: z.number(),
flushBatchSize: z.number(),
leaderLockTimeoutMs: z.number(),
leaderLockExtendIntervalMs: z.number(),
leaderLockAcquireAdditionalTimeMs: z.number(),
leaderLockRetryIntervalMs: z.number(),
ackIntervalSeconds: z.number(),
waitForAsyncInsert: z.boolean(),
});
const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number().int().nonnegative(), // ms; ≥ 0
maxOpenConnections: z.number().int().positive(), // ≥ 1
maxFlushConcurrency: z.number().int().positive(), // ≥ 1
flushIntervalMs: z.number().int().positive(), // ms; ≥ 1
flushBatchSize: z.number().int().positive(), // ≥ 1
leaderLockTimeoutMs: z.number().int().positive(), // ms; ≥ 1
leaderLockExtendIntervalMs: z.number().int().positive(), // ms; ≥ 1
leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(), // ms; ≥ 0
leaderLockRetryIntervalMs: z.number().int().positive(), // ms; ≥ 1
ackIntervalSeconds: z.number().int().positive(), // s; ≥ 1
waitForAsyncInsert: z.boolean(),
});


type CreateRunReplicationServiceParams = z.infer<typeof CreateRunReplicationServiceParams>;

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
const globalService = getRunsReplicationGlobal();

if (globalService) {
return json(
{ error: "Global runs replication service already exists. Stop it first." },
{ status: 400 }
);
}

const params = CreateRunReplicationServiceParams.parse(await request.json());

const service = createRunReplicationService(params);

setRunsReplicationGlobal(service);

await service.start();
Comment on lines +65 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Move global registration after a successful start() to avoid stale singletons

If service.start() throws, setRunsReplicationGlobal(service) has already stored a half-initialised object. Subsequent calls will see “service already exists” even though nothing is running.

-    const service = createRunReplicationService(params);
-
-    setRunsReplicationGlobal(service);
-
-    await service.start();
+    const service = createRunReplicationService(params);
+
+    await service.start();          // ensure we’re fully up
+
+    setRunsReplicationGlobal(service);

Optionally wrap the start() in its own try/catch and call service.stop() on failure to guarantee cleanup.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const service = createRunReplicationService(params);
setRunsReplicationGlobal(service);
await service.start();
const service = createRunReplicationService(params);
await service.start(); // ensure we’re fully up
setRunsReplicationGlobal(service);


return json({
success: true,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
Comment on lines +74 to +76
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid leaking internal error messages to API consumers

Returning raw error.message may expose stack traces, SQL, or infrastructure details. Log the full error server-side and send a generic message to the client.

-  } catch (error) {
-    return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
+  } catch (error) {
+    console.error("Failed to create runs replication service", error);
+    return json({ error: "Failed to create runs replication service" }, { status: 400 });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
} catch (error) {
console.error("Failed to create runs replication service", error);
return json({ error: "Failed to create runs replication service" }, { status: 400 });
}

}

function createRunReplicationService(params: CreateRunReplicationServiceParams) {
const clickhouse = new ClickHouse({
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
name: params.name,
keepAlive: {
enabled: params.keepAliveEnabled,
idleSocketTtl: params.keepAliveIdleSocketTtl,
},
logLevel: "debug",
compression: {
request: true,
},
maxOpenConnections: params.maxOpenConnections,
});

const service = new RunsReplicationService({
clickhouse: clickhouse,
pgConnectionUrl: env.DATABASE_URL,
serviceName: params.name,
slotName: env.RUN_REPLICATION_SLOT_NAME,
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
redisOptions: {
keyPrefix: "runs-replication:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
Comment on lines +100 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Convert RUN_REPLICATION_REDIS_PORT to a number to satisfy redis client typings

Environment variables are always strings; passing a string where a number is expected can cause connection failures in some redis client versions.

-      port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
+      port:
+        env.RUN_REPLICATION_REDIS_PORT !== undefined
+          ? Number(env.RUN_REPLICATION_REDIS_PORT)
+          : undefined,

(Apply the same conversion wherever a numeric env var is forwarded.)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
redisOptions: {
keyPrefix: "runs-replication:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
redisOptions: {
keyPrefix: "runs-replication:",
port:
env.RUN_REPLICATION_REDIS_PORT !== undefined
? Number(env.RUN_REPLICATION_REDIS_PORT)
: undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},

maxFlushConcurrency: params.maxFlushConcurrency,
flushIntervalMs: params.flushIntervalMs,
flushBatchSize: params.flushBatchSize,
leaderLockTimeoutMs: params.leaderLockTimeoutMs,
leaderLockExtendIntervalMs: params.leaderLockExtendIntervalMs,
leaderLockAcquireAdditionalTimeMs: params.leaderLockAcquireAdditionalTimeMs,
leaderLockRetryIntervalMs: params.leaderLockRetryIntervalMs,
ackIntervalSeconds: params.ackIntervalSeconds,
logLevel: "debug",
waitForAsyncInsert: params.waitForAsyncInsert,
});

return service;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { startTcpBufferMonitor } from "~/services/monitorTcpBuffers.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getTcpMonitorGlobal, setTcpMonitorGlobal } from "~/services/runsReplicationGlobal.server";

const schema = z.object({
intervalMs: z.number().min(1000).max(60_000).default(5_000),
});

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
const body = await request.json();
const { intervalMs } = schema.parse(body);

const globalMonitor = getTcpMonitorGlobal();

if (globalMonitor) {
return json(
{
error: "Tcp buffer monitor already running, you must stop it before starting a new one",
},
{
status: 400,
}
);
}

setTcpMonitorGlobal(startTcpBufferMonitor(intervalMs));

return json({
success: true,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";

export async function action({ request }: ActionFunctionArgs) {
Expand All @@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) {
}

try {
await runsReplicationInstance?.start();
const globalService = getRunsReplicationGlobal();

if (globalService) {
await globalService.start();
} else {
await runsReplicationInstance?.start();
}

return json({
success: true,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import {
getTcpMonitorGlobal,
unregisterTcpMonitorGlobal,
} from "~/services/runsReplicationGlobal.server";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
const globalMonitor = getTcpMonitorGlobal();

if (!globalMonitor) {
return json({ error: "Tcp buffer monitor not running" }, { status: 400 });
}

clearInterval(globalMonitor);
unregisterTcpMonitorGlobal();

return json({
success: true,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
9 changes: 8 additions & 1 deletion apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";

export async function action({ request }: ActionFunctionArgs) {
Expand All @@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) {
}

try {
await runsReplicationInstance?.stop();
const globalService = getRunsReplicationGlobal();

if (globalService) {
await globalService.stop();
} else {
await runsReplicationInstance?.stop();
}

return json({
success: true,
Expand Down
13 changes: 12 additions & 1 deletion apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import {
getRunsReplicationGlobal,
unregisterRunsReplicationGlobal,
} from "~/services/runsReplicationGlobal.server";
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";

export async function action({ request }: ActionFunctionArgs) {
Expand All @@ -26,7 +30,14 @@ export async function action({ request }: ActionFunctionArgs) {
}

try {
await runsReplicationInstance?.teardown();
const globalService = getRunsReplicationGlobal();

if (globalService) {
await globalService.teardown();
unregisterRunsReplicationGlobal();
} else {
await runsReplicationInstance?.teardown();
}

return json({
success: true,
Expand Down
57 changes: 57 additions & 0 deletions apps/webapp/app/services/monitorTcpBuffers.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// monitorTcpBuffers.ts
import fs from "fs/promises";
import os from "os";
import { logger } from "./logger.server";

/**
* Parse /proc/net/sockstat and /proc/sys/net/* every `intervalMs`
* and log the numbers. You can pivot these logs into CloudWatch
* metrics with a filter pattern if you like.
*/
export function startTcpBufferMonitor(intervalMs = 5_000) {
async function sampleOnce() {
try {
const [sockstat, wmemMax, tcpMem] = await Promise.all([
fs.readFile("/proc/net/sockstat", "utf8"),
fs.readFile("/proc/sys/net/core/wmem_max", "utf8"),
fs.readFile("/proc/sys/net/ipv4/tcp_mem", "utf8"),
]);

logger.debug("tcp-buffer-monitor", {
sockstat,
wmemMax,
tcpMem,
});

// /proc/net/sockstat has lines like:
// TCP: inuse 5 orphan 0 tw 0 alloc 6 mem 409
const tcpLine = sockstat.split("\n").find((l) => l.startsWith("TCP:")) ?? "";
const fields = tcpLine.trim().split(/\s+/);
const inUse = Number(fields[2]); // open sockets
const alloc = Number(fields[8]); // total sockets with buffers
const memPages = Number(fields[10]); // pages (4 kB each)
const memBytes = memPages * 4096;

const wmemMaxBytes = Number(wmemMax.trim());
const [low, pressure, high] = tcpMem
.trim()
.split(/\s+/)
.map((n) => Number(n) * 4096); // pages → bytes

logger.debug("tcp-buffer-monitor", {
t: Date.now(),
host: os.hostname(),
sockets_in_use: inUse,
sockets_alloc: alloc,
tcp_mem_bytes: memBytes,
tcp_mem_high: high,
wmem_max: wmemMaxBytes,
});
} catch (err) {
// Log and keep going; most errors are “file disappeared for a moment”
console.error("tcp-buffer-monitor error", err);
}
}

return setInterval(sampleOnce, intervalMs);
}
Loading