Skip to content

Allow creating and monitoring run replication services with different settings #2055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 14, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented May 14, 2025

Summary by CodeRabbit

  • New Features

    • Introduced admin API endpoints to create, start, stop, and teardown a runs replication service, as well as to start and stop a TCP buffer monitor.
    • Added periodic TCP buffer monitoring with configurable intervals and metrics logging.
    • Added global management for runs replication and TCP monitor services.
  • Enhancements

    • Added support for request compression and configurable maximum open connections in ClickHouse client configuration.
    • Improved logging configuration for ClickHouse client.
    • Added new environment variable to control maximum open connections for replication.
  • Tests

    • Enabled request compression in tests and reduced test duration for long-running replication scenarios.

Copy link

changeset-bot bot commented May 14, 2025

⚠️ No Changeset found

Latest commit: 05794e3

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented May 14, 2025

"""

Walkthrough

This change introduces new API endpoints and global singleton management for a runs replication service and TCP buffer monitor in a web application. It adds environment and configuration options for ClickHouse clients, including connection limits and compression. Logging enhancements and new test adjustments are also included.

Changes

Files/Group Change Summary
apps/webapp/app/env.server.ts Added RUN_REPLICATION_MAX_OPEN_CONNECTIONS environment variable to the schema with integer coercion and default 10.
apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts New API endpoint for creating a runs replication service with validated parameters and global singleton management.
apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts
...stop-monitor.ts
New API endpoints to start and stop a global TCP buffer monitor, including validation, authentication, and global singleton management.
apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts
...stop.ts
...teardown.ts
Updated to use global singleton accessor functions for runs replication service start, stop, and teardown actions.
apps/webapp/app/services/monitorTcpBuffers.server.ts New module to periodically sample and log Linux TCP buffer statistics, exporting a function to start the monitor.
apps/webapp/app/services/runsReplicationGlobal.server.ts New module for managing global singleton instances of runs replication service and TCP monitor, with getter, setter, and unregister functions.
apps/webapp/app/services/runsReplicationInstance.server.ts Switched logging from logger to console, and added ClickHouse client options for compression and max open connections from environment variable.
apps/webapp/test/runsReplicationService.test.ts Enabled request compression for ClickHouse client in tests and reduced the duration of a long-running test from 4 minutes to 1 minute.
internal-packages/clickhouse/src/client.ts Added maxOpenConnections and compression options to config and client instantiation; mapped log levels for ClickHouse client; enhanced logging configuration.
internal-packages/clickhouse/src/index.ts Extended ClickhouseCommonConfig with compression and maxOpenConnections; passed these options to client instances in the ClickHouse class.
internal-packages/replication/src/client.ts Removed logging of the lock property from leader lock heartbeat debug and error logs.

Sequence Diagram(s)

sequenceDiagram
    participant AdminUser
    participant API
    participant Auth
    participant GlobalStore
    participant RunsReplicationService
    participant ClickHouse
    participant Redis

    AdminUser->>API: POST /admin.api.v1.runs-replication.create
    API->>Auth: Validate personal access token
    Auth-->>API: User info (isAdmin)
    API->>GlobalStore: Check if service exists
    alt Service does not exist
        API->>API: Parse & validate payload
        API->>RunsReplicationService: Create instance (with ClickHouse & Redis configs)
        API->>GlobalStore: Store service singleton
        API->>RunsReplicationService: Start service
        RunsReplicationService->>ClickHouse: Connect
        RunsReplicationService->>Redis: Connect
        API-->>AdminUser: Success response
    else Service exists
        API-->>AdminUser: 400 error (already running)
    end
Loading
sequenceDiagram
    participant AdminUser
    participant API
    participant Auth
    participant GlobalStore
    participant Monitor

    AdminUser->>API: POST /admin.api.v1.runs-replication.start-monitor
    API->>Auth: Validate personal access token
    Auth-->>API: User info (isAdmin)
    API->>GlobalStore: Check if monitor exists
    alt Monitor does not exist
        API->>Monitor: Start TCP buffer monitor (interval)
        API->>GlobalStore: Store monitor singleton
        API-->>AdminUser: Success response
    else Monitor exists
        API-->>AdminUser: 400 error (already running)
    end

    AdminUser->>API: POST /admin.api.v1.runs-replication.stop-monitor
    API->>Auth: Validate personal access token
    Auth-->>API: User info (isAdmin)
    API->>GlobalStore: Get monitor singleton
    alt Monitor exists
        API->>Monitor: Stop monitor (clear interval)
        API->>GlobalStore: Unregister monitor
        API-->>AdminUser: Success response
    else Monitor does not exist
        API-->>AdminUser: 400 error (not running)
    end
Loading

Possibly related PRs

  • triggerdotdev/trigger.dev#2035: Introduced a comprehensive set of RUN_REPLICATION_ environment variables for configuring the runs replication system, to which this PR adds an incremental extension.

Suggested reviewers

  • matt-aitken

Poem

🐇
In the meadow of code, new services bloom,
Replication and monitors, dispelling the gloom.
With globals and configs, connections are spun,
Compression and limits—oh what fun!
TCP buffers watched, ClickHouse now compressed,
The rabbits rejoice, for the code is well-dressed!

"""

Tip

⚡️ Free AI Code Reviews for VS Code, Cursor, Windsurf
  • CodeRabbit now supports VS Code, Cursor and Windsurf. This brings free AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Learn more

[!ANNOUNCEMENT]

⚡️ Faster reviews with caching
  • CodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure Review - Disable Cache at either the organization or repository level. If you prefer to disable all data retention across your organization, simply turn off the Data Retention setting under your Organization Settings.

Enjoy the performance boost—your workflow just got faster.


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2154fe3 and 05794e3.

📒 Files selected for processing (1)
  • apps/webapp/test/runsReplicationService.test.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/webapp/test/runsReplicationService.test.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: Analyze (javascript-typescript)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (5)
apps/webapp/app/services/runsReplicationInstance.server.ts (1)

19-19: Replaced logger with console for logging.

The structured logger has been replaced with direct console.log/console.error calls. While this might be simpler, it loses the benefits of structured logging like consistent formatting, log levels, and easier integration with log management systems.

Consider keeping the structured logger for better observability, especially in production:

-console.log("🗃️ Runs replication service not enabled");
+logger.info("🗃️ Runs replication service not enabled");

-console.log("🗃️ Runs replication service enabled");
+logger.info("🗃️ Runs replication service enabled");

-console.log("🗃️ Runs replication service started");
+logger.info("🗃️ Runs replication service started");

-console.error("🗃️ Runs replication service failed to start", {
+logger.error("🗃️ Runs replication service failed to start", {

Also applies to: 23-24, 71-71, 74-76

apps/webapp/app/services/monitorTcpBuffers.server.ts (2)

50-53: Consider enhancing error handling with structured logging

Current error handling logs to console.error, which might not integrate well with the application's logging system. Consider using the logger for error messages as well.

- console.error("tcp-buffer-monitor error", err);
+ logger.error("tcp-buffer-monitor error", { error: err });

56-57: Consider adding a type annotation for the return value

Adding a return type would improve code clarity and type safety.

-export function startTcpBufferMonitor(intervalMs = 5_000) {
+export function startTcpBufferMonitor(intervalMs = 5_000): NodeJS.Timeout {
apps/webapp/app/services/runsReplicationGlobal.server.ts (1)

14-36: Well-structured global state management with typed accessors

The implementation provides clear getter/setter/unregister functions with proper type safety.

However, consider adding null checks or default values when getting global instances to prevent potential runtime errors.

export function getRunsReplicationGlobal(): RunsReplicationService | undefined {
  return _global[GLOBAL_RUNS_REPLICATION_KEY];
}

export function getTcpMonitorGlobal(): NodeJS.Timeout | undefined {
  return _global[GLOBAL_TCP_MONITOR_KEY];
}
apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts (1)

80-92: Guard against missing mandatory environment variables

env.RUN_REPLICATION_CLICKHOUSE_URL (and others like DATABASE_URL) are assumed to be present but not validated. A missing URL will produce a confusing runtime error deep inside the ClickHouse client.

Consider asserting upfront:

if (!env.RUN_REPLICATION_CLICKHOUSE_URL) {
  throw new Error("RUN_REPLICATION_CLICKHOUSE_URL is not set");
}

You can place these assertions in env.server.ts for centralised validation.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a8e38d9 and 2154fe3.

📒 Files selected for processing (14)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (2 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts (1 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts (2 hunks)
  • apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (2 hunks)
  • apps/webapp/app/services/monitorTcpBuffers.server.ts (1 hunks)
  • apps/webapp/app/services/runsReplicationGlobal.server.ts (1 hunks)
  • apps/webapp/app/services/runsReplicationInstance.server.ts (3 hunks)
  • apps/webapp/test/runsReplicationService.test.ts (2 hunks)
  • internal-packages/clickhouse/src/client/client.ts (4 hunks)
  • internal-packages/clickhouse/src/index.ts (4 hunks)
  • internal-packages/replication/src/client.ts (0 hunks)
💤 Files with no reviewable changes (1)
  • internal-packages/replication/src/client.ts
🧰 Additional context used
🧬 Code Graph Analysis (4)
apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (2)
apps/webapp/app/services/runsReplicationGlobal.server.ts (2)
  • getRunsReplicationGlobal (14-16)
  • unregisterRunsReplicationGlobal (22-24)
apps/webapp/app/services/runsReplicationInstance.server.ts (1)
  • runsReplicationInstance (9-12)
apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts (2)
apps/webapp/app/services/runsReplicationGlobal.server.ts (1)
  • getRunsReplicationGlobal (14-16)
apps/webapp/app/services/runsReplicationInstance.server.ts (1)
  • runsReplicationInstance (9-12)
apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (2)
apps/webapp/app/services/runsReplicationGlobal.server.ts (1)
  • getRunsReplicationGlobal (14-16)
apps/webapp/app/services/runsReplicationInstance.server.ts (1)
  • runsReplicationInstance (9-12)
apps/webapp/app/services/runsReplicationGlobal.server.ts (1)
apps/webapp/app/services/runsReplicationService.server.ts (1)
  • RunsReplicationService (62-652)
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (25)
apps/webapp/app/env.server.ts (1)

774-774: Added configuration for ClickHouse client connection pooling.

This new environment variable RUN_REPLICATION_MAX_OPEN_CONNECTIONS properly configures the maximum number of open connections for the ClickHouse client in the runs replication service, with a sensible default of 10.

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts (2)

4-4: Added import for the global singleton management.

Appropriate import for the new global runs replication service accessor.


30-36: Enhanced service management with global singleton prioritization.

The implementation now properly prioritizes the global runs replication service instance while maintaining backward compatibility with the existing singleton pattern. This approach ensures a smooth transition to the new global management system.

apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts (2)

4-4: Added import for the global singleton management.

Appropriate import for the new global runs replication service accessor.


30-36: Enhanced service management with global singleton prioritization.

The implementation now properly prioritizes the global runs replication service instance while maintaining backward compatibility with the existing singleton pattern. This approach ensures a smooth transition to the new global management system.

apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts (2)

4-7: Added imports for global singleton management.

The code properly imports both the getter and the unregister function for the global runs replication service.


33-40: Implemented proper teardown with global singleton cleanup.

The implementation correctly:

  1. Prioritizes the global runs replication service instance
  2. Properly calls teardown on the service
  3. Unregisters the global service to prevent memory leaks and stale references
  4. Falls back to the existing singleton when necessary

This ensures complete cleanup of resources and prevents potential memory issues.

apps/webapp/test/runsReplicationService.test.ts (2)

21-23: Add compression to improve ClickHouse client performance.

This change adds request compression to the ClickHouse client, which is a good optimization for reducing network traffic, especially when dealing with large datasets in the replication service.


611-612: Test duration reduced from 4 minutes to 1 minute.

Reducing the long-running test duration from 4 minutes to 1 minute is a good optimization for the test suite's runtime while still verifying the service's ability to handle processing transactions over an extended period.

apps/webapp/app/services/runsReplicationInstance.server.ts (1)

33-36: Enhanced ClickHouse client configuration.

Adding compression and connection pool management to the ClickHouse client are excellent optimizations:

  1. Request compression will reduce network traffic and potentially improve throughput
  2. Configurable connection pooling via environment variables allows for tuning based on workload and infrastructure
apps/webapp/app/routes/admin.api.v1.runs-replication.stop-monitor.ts (1)

1-47: New endpoint to stop the TCP buffer monitor.

This endpoint follows good practices:

  • Proper authentication and authorization checks
  • Appropriate error handling
  • Clean resource management with clearInterval
  • Clear success/error responses
apps/webapp/app/routes/admin.api.v1.runs-replication.start-monitor.ts (2)

8-10: Well-defined input validation schema.

Using Zod for schema validation with appropriate min/max constraints is a good practice. The 1-60 second range for the monitoring interval is reasonable.


34-58: Robust implementation of the start-monitor action.

The implementation shows good practices:

  • Try/catch with proper error handling
  • Input validation using the schema
  • Checking if a monitor is already running before starting a new one
  • Clear error messages for different scenarios
internal-packages/clickhouse/src/index.ts (3)

21-25: Feature enhancement: Added configuration options for ClickHouse client

The new configuration options for compression and connection limits are well-defined as optional properties in the ClickhouseCommonConfig type.


67-68: Properly forwarding new configuration options to client

New configuration options are correctly passed to the ClickhouseClient when using a single URL configuration.


83-84: Consistent implementation across reader/writer clients

Configuration options are consistently applied to both reader and writer clients when using separate endpoints.

Also applies to: 94-95

apps/webapp/app/services/monitorTcpBuffers.server.ts (3)

6-10: Well-documented purpose for the TCP buffer monitoring function

The comments clearly explain the function's purpose and how the logs can be utilized.


14-18: Efficiently reading system files with Promise.all

Good use of concurrent file reading to optimize the sampling process.


26-39:

❓ Verification inconclusive

Verify system compatibility for Linux-specific implementation

This code reads from Linux-specific file paths and assumes a 4KB page size, which might not work in all environments.


🏁 Script executed:

#!/bin/bash
# Check if the system files exist and are readable
echo "Checking if the required system files exist:"
[ -f "/proc/net/sockstat" ] && echo "✓ /proc/net/sockstat exists" || echo "✗ /proc/net/sockstat does not exist"
[ -f "/proc/sys/net/core/wmem_max" ] && echo "✓ /proc/sys/net/core/wmem_max exists" || echo "✗ /proc/sys/net/core/wmem_max does not exist"
[ -f "/proc/sys/net/ipv4/tcp_mem" ] && echo "✓ /proc/sys/net/ipv4/tcp_mem exists" || echo "✗ /proc/sys/net/ipv4/tcp_mem does not exist"

# Check page size (should be 4KB on most systems)
echo -e "\nVerifying page size:"
getconf PAGE_SIZE

Length of output: 664


Verify Linux-specific dependencies and page size assumptions

This service reads from Linux-only files and hardcodes a 4 KB page size, which may not exist or hold true on all target systems. Please confirm on your deployment environments that:

  • /proc/net/sockstat is present and readable
  • /proc/sys/net/core/wmem_max is present and readable
  • /proc/sys/net/ipv4/tcp_mem is present and readable
  • The system page size is 4096 bytes (or adjust calculations accordingly)

If any of these aren’t guaranteed, consider adding feature detection or configurable defaults.

apps/webapp/app/services/runsReplicationGlobal.server.ts (2)

3-4: Good use of symbols for global state management

Using Symbol.for creates unique global registry keys, reducing the risk of naming collisions.


11-12: Cross-environment compatibility with globalThis fallback

Good practice to handle both browser and Node.js environments by falling back to global if globalThis is unavailable.

internal-packages/clickhouse/src/client/client.ts (4)

4-4: Extended ClickhouseConfig with new configuration options

The addition of maxOpenConnections and compression options aligns with the changes in index.ts.

Also applies to: 34-39


56-57: Properly configuring underlying ClickHouse client

New options are correctly passed to the createClient function as compression and max_open_connections parameters.


63-65: Added logging configuration with proper log level mapping

Good implementation of log level configuration that integrates with the existing logging system.


303-320: Well-structured log level conversion utility

The helper function properly maps between application log levels and ClickHouse-specific log levels with appropriate fallbacks.

Comment on lines +74 to +76
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Avoid leaking internal error messages to API consumers

Returning raw error.message may expose stack traces, SQL, or infrastructure details. Log the full error server-side and send a generic message to the client.

-  } catch (error) {
-    return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
+  } catch (error) {
+    console.error("Failed to create runs replication service", error);
+    return json({ error: "Failed to create runs replication service" }, { status: 400 });
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
} catch (error) {
console.error("Failed to create runs replication service", error);
return json({ error: "Failed to create runs replication service" }, { status: 400 });
}

Comment on lines +65 to +69
const service = createRunReplicationService(params);

setRunsReplicationGlobal(service);

await service.start();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Move global registration after a successful start() to avoid stale singletons

If service.start() throws, setRunsReplicationGlobal(service) has already stored a half-initialised object. Subsequent calls will see “service already exists” even though nothing is running.

-    const service = createRunReplicationService(params);
-
-    setRunsReplicationGlobal(service);
-
-    await service.start();
+    const service = createRunReplicationService(params);
+
+    await service.start();          // ensure we’re fully up
+
+    setRunsReplicationGlobal(service);

Optionally wrap the start() in its own try/catch and call service.stop() on failure to guarantee cleanup.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const service = createRunReplicationService(params);
setRunsReplicationGlobal(service);
await service.start();
const service = createRunReplicationService(params);
await service.start(); // ensure we’re fully up
setRunsReplicationGlobal(service);

Comment on lines +100 to +108
redisOptions: {
keyPrefix: "runs-replication:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Convert RUN_REPLICATION_REDIS_PORT to a number to satisfy redis client typings

Environment variables are always strings; passing a string where a number is expected can cause connection failures in some redis client versions.

-      port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
+      port:
+        env.RUN_REPLICATION_REDIS_PORT !== undefined
+          ? Number(env.RUN_REPLICATION_REDIS_PORT)
+          : undefined,

(Apply the same conversion wherever a numeric env var is forwarded.)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
redisOptions: {
keyPrefix: "runs-replication:",
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
redisOptions: {
keyPrefix: "runs-replication:",
port:
env.RUN_REPLICATION_REDIS_PORT !== undefined
? Number(env.RUN_REPLICATION_REDIS_PORT)
: undefined,
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
enableAutoPipelining: true,
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},

Comment on lines +13 to +27
const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number(),
maxOpenConnections: z.number(),
maxFlushConcurrency: z.number(),
flushIntervalMs: z.number(),
flushBatchSize: z.number(),
leaderLockTimeoutMs: z.number(),
leaderLockExtendIntervalMs: z.number(),
leaderLockAcquireAdditionalTimeMs: z.number(),
leaderLockRetryIntervalMs: z.number(),
ackIntervalSeconds: z.number(),
waitForAsyncInsert: z.boolean(),
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Tighten parameter validation to reject nonsensical or dangerous input

Most numeric parameters (intervals, counts, batch sizes, TTL, etc.) must be positive integers.
z.number() currently accepts NaN, fractions, and negative values (as well as numbers far larger than what the service can realistically handle).

 const CreateRunReplicationServiceParams = z.object({
   name: z.string(),
-  keepAliveEnabled: z.boolean(),
-  keepAliveIdleSocketTtl: z.number(),
-  maxOpenConnections: z.number(),
-  maxFlushConcurrency: z.number(),
-  flushIntervalMs: z.number(),
-  flushBatchSize: z.number(),
-  leaderLockTimeoutMs: z.number(),
-  leaderLockExtendIntervalMs: z.number(),
-  leaderLockAcquireAdditionalTimeMs: z.number(),
-  leaderLockRetryIntervalMs: z.number(),
-  ackIntervalSeconds: z.number(),
+  keepAliveEnabled: z.boolean(),
+  keepAliveIdleSocketTtl: z.number().int().positive(),
+  maxOpenConnections: z.number().int().positive(),
+  maxFlushConcurrency: z.number().int().positive(),
+  flushIntervalMs: z.number().int().positive(),
+  flushBatchSize: z.number().int().positive(),
+  leaderLockTimeoutMs: z.number().int().positive(),
+  leaderLockExtendIntervalMs: z.number().int().positive(),
+  leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(),
+  leaderLockRetryIntervalMs: z.number().int().positive(),
+  ackIntervalSeconds: z.number().int().positive(),
   waitForAsyncInsert: z.boolean(),
 });

Rejecting invalid values early prevents runaway timers, mis-configured pools, and DoS vectors.


🏁 Script executed:

#!/bin/bash
set -e
echo "Searching for CreateRunReplicationServiceParams definition and usage:"
rg -n "CreateRunReplicationServiceParams" apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts || true

echo
echo "Showing contents of the route file:"
sed -n '1,200p' apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts || true

echo
echo "Checking for existing z.number().int() usage across the repo:"
rg -n "z\.number\(\)\.int" -A3 -B1 || true

Length of output: 12923


Enforce strict integer and positivity constraints on replication parameters

All numeric fields in CreateRunReplicationServiceParams currently use z.number(), which permits negatives, fractions, and NaN. To prevent mis-configurations, resource exhaustion, and DoS scenarios, these should be constrained to integers—and where applicable, strictly positive or non-negative.

In apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts, update the schema as follows:

 const CreateRunReplicationServiceParams = z.object({
   name: z.string(),
   keepAliveEnabled: z.boolean(),
-  keepAliveIdleSocketTtl: z.number(),
+  keepAliveIdleSocketTtl: z.number().int().nonnegative(),        // ms; ≥ 0
-  maxOpenConnections: z.number(),
+  maxOpenConnections: z.number().int().positive(),             // ≥ 1
-  maxFlushConcurrency: z.number(),
+  maxFlushConcurrency: z.number().int().positive(),            // ≥ 1
-  flushIntervalMs: z.number(),
+  flushIntervalMs: z.number().int().positive(),                // ms; ≥ 1
-  flushBatchSize: z.number(),
+  flushBatchSize: z.number().int().positive(),                 // ≥ 1
-  leaderLockTimeoutMs: z.number(),
+  leaderLockTimeoutMs: z.number().int().positive(),            // ms; ≥ 1
-  leaderLockExtendIntervalMs: z.number(),
+  leaderLockExtendIntervalMs: z.number().int().positive(),     // ms; ≥ 1
-  leaderLockAcquireAdditionalTimeMs: z.number(),
+  leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(), // ms; ≥ 0
-  leaderLockRetryIntervalMs: z.number(),
+  leaderLockRetryIntervalMs: z.number().int().positive(),       // ms; ≥ 1
-  ackIntervalSeconds: z.number(),
+  ackIntervalSeconds: z.number().int().positive(),             // s; ≥ 1
   waitForAsyncInsert: z.boolean(),
 });

This aligns with existing .int() usage elsewhere (e.g., batchTrigger and ClickHouse schemas) and ensures invalid values are rejected at parse time.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number(),
maxOpenConnections: z.number(),
maxFlushConcurrency: z.number(),
flushIntervalMs: z.number(),
flushBatchSize: z.number(),
leaderLockTimeoutMs: z.number(),
leaderLockExtendIntervalMs: z.number(),
leaderLockAcquireAdditionalTimeMs: z.number(),
leaderLockRetryIntervalMs: z.number(),
ackIntervalSeconds: z.number(),
waitForAsyncInsert: z.boolean(),
});
const CreateRunReplicationServiceParams = z.object({
name: z.string(),
keepAliveEnabled: z.boolean(),
keepAliveIdleSocketTtl: z.number().int().nonnegative(), // ms; ≥ 0
maxOpenConnections: z.number().int().positive(), // ≥ 1
maxFlushConcurrency: z.number().int().positive(), // ≥ 1
flushIntervalMs: z.number().int().positive(), // ms; ≥ 1
flushBatchSize: z.number().int().positive(), // ≥ 1
leaderLockTimeoutMs: z.number().int().positive(), // ms; ≥ 1
leaderLockExtendIntervalMs: z.number().int().positive(), // ms; ≥ 1
leaderLockAcquireAdditionalTimeMs: z.number().int().nonnegative(), // ms; ≥ 0
leaderLockRetryIntervalMs: z.number().int().positive(), // ms; ≥ 1
ackIntervalSeconds: z.number().int().positive(), // s; ≥ 1
waitForAsyncInsert: z.boolean(),
});

@ericallam ericallam merged commit d1e4064 into main May 14, 2025
12 checks passed
@ericallam ericallam deleted the runs-replication-experiments branch May 14, 2025 19:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants