Skip to content

Commit 1d7c2ad

Browse files
committed
Fix new internal package typecheck issues and start adding telemetry to the replication service
1 parent ae14fa2 commit 1d7c2ad

File tree

6 files changed

+142
-71
lines changed

6 files changed

+142
-71
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import EventEmitter from "node:events";
1010
import pLimit from "p-limit";
1111
import { Counter, Gauge } from "prom-client";
1212
import type { MetricsRegister } from "~/metrics.server";
13+
import { Span, type Tracer, recordSpanError, trace } from "@internal/tracing";
1314

1415
export type RunsReplicationServiceOptions = {
1516
clickhouse: ClickHouse;
@@ -27,6 +28,7 @@ export type RunsReplicationServiceOptions = {
2728
leaderLockExtendIntervalMs?: number;
2829
ackIntervalSeconds?: number;
2930
logger?: Logger;
31+
tracer?: Tracer;
3032
};
3133

3234
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
@@ -54,12 +56,15 @@ export class RunsReplicationService {
5456
private _insertStrategy: "streaming" | "batching";
5557
private _isShuttingDown = false;
5658
private _isShutDownComplete = false;
59+
private _tracer: Tracer;
60+
private _currentSpan: Span | null = null;
5761

5862
public readonly events: EventEmitter<RunsReplicationServiceEvents>;
5963

6064
constructor(private readonly options: RunsReplicationServiceOptions) {
6165
this.logger = options.logger ?? new Logger("RunsReplicationService", "debug");
6266
this.events = new EventEmitter();
67+
this._tracer = options.tracer ?? trace.getTracer("runs-replication-service");
6368

6469
this._insertStrategy = options.insertStrategy ?? "streaming";
6570

@@ -206,6 +211,13 @@ export class RunsReplicationService {
206211
xid: message.xid,
207212
events: [],
208213
};
214+
215+
this._currentSpan = this._tracer.startSpan("handle_transaction", {
216+
attributes: {
217+
"transaction.xid": message.xid,
218+
},
219+
});
220+
209221
break;
210222
}
211223
case "insert": {
@@ -269,6 +281,8 @@ export class RunsReplicationService {
269281
// We need to immediately acknowledge the transaction
270282
// And then try and handle this transaction
271283
if (transaction.commitEndLsn) {
284+
this._currentSpan?.setAttribute("transaction.shutdown", true);
285+
272286
await this._replicationClient.acknowledge(transaction.commitEndLsn);
273287
alreadyAcknowledged = true;
274288
}
@@ -279,12 +293,23 @@ export class RunsReplicationService {
279293

280294
this._lastReplicationLagMs = transaction.replicationLagMs;
281295

296+
this._currentSpan?.setAttribute("transaction.replication_lag_ms", transaction.replicationLagMs);
297+
this._currentSpan?.setAttribute("transaction.xid", transaction.xid);
298+
299+
if (transaction.commitEndLsn) {
300+
this._currentSpan?.setAttribute("transaction.commit_end_lsn", transaction.commitEndLsn);
301+
}
302+
303+
this._currentSpan?.setAttribute("transaction.events", transaction.events.length);
304+
282305
// If there are no events, do nothing
283306
if (transaction.events.length === 0) {
284307
if (transaction.commitEndLsn && !alreadyAcknowledged) {
285308
await this._replicationClient.acknowledge(transaction.commitEndLsn);
286309
}
287310

311+
this._currentSpan?.end();
312+
288313
return;
289314
}
290315

@@ -293,6 +318,8 @@ export class RunsReplicationService {
293318
transaction,
294319
});
295320

321+
this._currentSpan?.end();
322+
296323
return;
297324
}
298325

@@ -301,12 +328,19 @@ export class RunsReplicationService {
301328
alreadyAcknowledged,
302329
});
303330

331+
const lsnToUInt64Start = process.hrtime.bigint();
332+
304333
// If there are events, we need to handle them
305334
const _version = lsnToUInt64(transaction.commitEndLsn);
306335

336+
this._currentSpan?.setAttribute(
337+
"transaction.lsn_to_uint64_ms",
338+
Number(process.hrtime.bigint() - lsnToUInt64Start) / 1_000_000
339+
);
340+
307341
this._transactionCounter?.inc();
308342

309-
if (this._insertStrategy === "streaming") {
343+
if (this._insertStrategy === "batching") {
310344
this._concurrentFlushScheduler
311345
.addToBatch(
312346
transaction.events.map((event) => ({
@@ -336,12 +370,23 @@ export class RunsReplicationService {
336370
this.logger.error("Error flushing batch", {
337371
error: flushError,
338372
});
373+
374+
if (this._currentSpan) {
375+
recordSpanError(this._currentSpan, flushError);
376+
}
339377
}
340378
}
341379

342380
if (!alreadyAcknowledged) {
381+
const acknowledgeStart = process.hrtime.bigint();
343382
await this._replicationClient.acknowledge(transaction.commitEndLsn);
383+
this._currentSpan?.setAttribute(
384+
"transaction.acknowledge_ms",
385+
Number(process.hrtime.bigint() - acknowledgeStart) / 1_000_000
386+
);
344387
}
388+
389+
this._currentSpan?.end();
345390
}
346391

347392
async #flushBatch(flushId: string, batch: Array<TaskRunInsert>) {

apps/webapp/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"@trigger.dev/otlp-importer": "workspace:*",
106106
"@trigger.dev/platform": "1.0.14",
107107
"@trigger.dev/sdk": "workspace:*",
108+
"@internal/tracing": "workspace:*",
108109
"@types/pg": "8.6.6",
109110
"@uiw/react-codemirror": "^4.19.5",
110111
"@unkey/cache": "^1.5.0",

internal-packages/clickhouse/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
},
2121
"scripts": {
2222
"clean": "rimraf dist",
23-
"typecheck": "tsc --noEmit",
23+
"typecheck": "tsc --noEmit -p tsconfig.build.json",
2424
"build": "pnpm run clean && tsc -p tsconfig.build.json",
2525
"dev": "tsc --watch -p tsconfig.build.json",
2626
"db:migrate": "docker compose -p triggerdotdev-docker -f ../../docker/docker-compose.yml up clickhouse_migrator --build",

internal-packages/replication/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
},
2222
"scripts": {
2323
"clean": "rimraf dist",
24-
"typecheck": "tsc --noEmit",
24+
"typecheck": "tsc --noEmit -p tsconfig.build.json",
2525
"build": "pnpm run clean && tsc -p tsconfig.build.json",
2626
"dev": "tsc --watch -p tsconfig.build.json",
2727
"test": "vitest --sequence.concurrent=false --no-file-parallelism",

internal-packages/replication/src/client.ts

Lines changed: 90 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { createRedisClient } from "@internal/redis";
77
import { Logger } from "@trigger.dev/core/logger";
88
import { LogicalReplicationClientError } from "./errors.js";
99
import { PgoutputMessage, PgoutputParser, getPgoutputStartReplicationSQL } from "./pgoutput.js";
10+
import { startSpan, trace, Tracer } from "@internal/tracing";
1011

1112
export interface LogicalReplicationClientOptions {
1213
/**
@@ -70,6 +71,8 @@ export interface LogicalReplicationClientOptions {
7071
* The actions to publish to the publication.
7172
*/
7273
publicationActions?: Array<"insert" | "update" | "delete" | "truncate">;
74+
75+
tracer?: Tracer;
7376
}
7477

7578
export type LogicalReplicationClientEvents = {
@@ -101,6 +104,7 @@ export class LogicalReplicationClient {
101104
private lastAckTimestamp: number = 0;
102105
private ackIntervalTimer: NodeJS.Timeout | null = null;
103106
private _isStopped: boolean = false;
107+
private _tracer: Tracer;
104108

105109
public get lastLsn(): string {
106110
return this.lastAcknowledgedLsn ?? "0/00000000";
@@ -113,6 +117,7 @@ export class LogicalReplicationClient {
113117
constructor(options: LogicalReplicationClientOptions) {
114118
this.options = options;
115119
this.logger = options.logger ?? new Logger("LogicalReplicationClient", "info");
120+
this._tracer = options.tracer ?? trace.getTracer("logical-replication-client");
116121

117122
this.autoAcknowledge =
118123
typeof options.autoAcknowledge === "boolean" ? options.autoAcknowledge : true;
@@ -145,54 +150,62 @@ export class LogicalReplicationClient {
145150
}
146151

147152
public async stop(): Promise<this> {
148-
if (this._isStopped) return this;
149-
this._isStopped = true;
150-
// Clean up leader lock heartbeat
151-
if (this.leaderLockHeartbeatTimer) {
152-
clearInterval(this.leaderLockHeartbeatTimer);
153-
this.leaderLockHeartbeatTimer = null;
154-
}
155-
// Clean up ack interval
156-
if (this.ackIntervalTimer) {
157-
clearInterval(this.ackIntervalTimer);
158-
this.ackIntervalTimer = null;
159-
}
160-
// Release leader lock if held
161-
await this.#releaseLeaderLock();
153+
return await startSpan(this._tracer, "logical_replication_client.stop", async (span) => {
154+
if (this._isStopped) return this;
155+
156+
span.setAttribute("replication_client.name", this.options.name);
157+
span.setAttribute("replication_client.table", this.options.table);
158+
span.setAttribute("replication_client.slot_name", this.options.slotName);
159+
span.setAttribute("replication_client.publication_name", this.options.publicationName);
160+
161+
this._isStopped = true;
162+
// Clean up leader lock heartbeat
163+
if (this.leaderLockHeartbeatTimer) {
164+
clearInterval(this.leaderLockHeartbeatTimer);
165+
this.leaderLockHeartbeatTimer = null;
166+
}
167+
// Clean up ack interval
168+
if (this.ackIntervalTimer) {
169+
clearInterval(this.ackIntervalTimer);
170+
this.ackIntervalTimer = null;
171+
}
172+
// Release leader lock if held
173+
await this.#releaseLeaderLock();
162174

163-
this.connection?.removeAllListeners();
164-
this.connection = null;
175+
this.connection?.removeAllListeners();
176+
this.connection = null;
165177

166-
if (this.client) {
167-
this.client.removeAllListeners();
178+
if (this.client) {
179+
this.client.removeAllListeners();
168180

169-
const [endError] = await tryCatch(this.client.end());
181+
const [endError] = await tryCatch(this.client.end());
170182

171-
if (endError) {
172-
this.logger.error("Failed to end client", {
173-
name: this.options.name,
174-
error: endError,
175-
});
176-
} else {
177-
this.logger.info("Ended client", {
178-
name: this.options.name,
179-
});
183+
if (endError) {
184+
this.logger.error("Failed to end client", {
185+
name: this.options.name,
186+
error: endError,
187+
});
188+
} else {
189+
this.logger.info("Ended client", {
190+
name: this.options.name,
191+
});
192+
}
193+
this.client = null;
180194
}
181-
this.client = null;
182-
}
183195

184-
// clear any intervals
185-
if (this.leaderLockHeartbeatTimer) {
186-
clearInterval(this.leaderLockHeartbeatTimer);
187-
this.leaderLockHeartbeatTimer = null;
188-
}
196+
// clear any intervals
197+
if (this.leaderLockHeartbeatTimer) {
198+
clearInterval(this.leaderLockHeartbeatTimer);
199+
this.leaderLockHeartbeatTimer = null;
200+
}
189201

190-
if (this.ackIntervalTimer) {
191-
clearInterval(this.ackIntervalTimer);
192-
this.ackIntervalTimer = null;
193-
}
202+
if (this.ackIntervalTimer) {
203+
clearInterval(this.ackIntervalTimer);
204+
this.ackIntervalTimer = null;
205+
}
194206

195-
return this;
207+
return this;
208+
});
196209
}
197210

198211
public async teardown(): Promise<boolean> {
@@ -523,34 +536,43 @@ export class LogicalReplicationClient {
523536
public async acknowledge(lsn: string): Promise<boolean> {
524537
if (this._isStopped) return false;
525538
if (!this.connection) return false;
526-
// WAL LSN split
527-
const slice = lsn.split("/");
528-
let [upperWAL, lowerWAL]: [number, number] = [parseInt(slice[0], 16), parseInt(slice[1], 16)];
529-
// Timestamp as microseconds since midnight 2000-01-01
530-
const now = Date.now() - 946080000000;
531-
const upperTimestamp = Math.floor(now / 4294967.296);
532-
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296);
533-
if (lowerWAL === 4294967295) {
534-
upperWAL = upperWAL + 1;
535-
lowerWAL = 0;
536-
} else {
537-
lowerWAL = lowerWAL + 1;
538-
}
539-
const response = Buffer.alloc(34);
540-
response.fill(0x72); // 'r'
541-
response.writeUInt32BE(upperWAL, 1);
542-
response.writeUInt32BE(lowerWAL, 5);
543-
response.writeUInt32BE(upperWAL, 9);
544-
response.writeUInt32BE(lowerWAL, 13);
545-
response.writeUInt32BE(upperWAL, 17);
546-
response.writeUInt32BE(lowerWAL, 21);
547-
response.writeUInt32BE(upperTimestamp, 25);
548-
response.writeUInt32BE(lowerTimestamp, 29);
549-
response.writeInt8(0, 33);
550-
// @ts-ignore
551-
this.connection.sendCopyFromChunk(response);
552-
this.lastAckTimestamp = Date.now();
553-
return true;
539+
540+
return await startSpan(this._tracer, "logical_replication_client.acknowledge", async (span) => {
541+
span.setAttribute("replication_client.lsn", lsn);
542+
span.setAttribute("replication_client.name", this.options.name);
543+
span.setAttribute("replication_client.table", this.options.table);
544+
span.setAttribute("replication_client.slot_name", this.options.slotName);
545+
span.setAttribute("replication_client.publication_name", this.options.publicationName);
546+
547+
// WAL LSN split
548+
const slice = lsn.split("/");
549+
let [upperWAL, lowerWAL]: [number, number] = [parseInt(slice[0], 16), parseInt(slice[1], 16)];
550+
// Timestamp as microseconds since midnight 2000-01-01
551+
const now = Date.now() - 946080000000;
552+
const upperTimestamp = Math.floor(now / 4294967.296);
553+
const lowerTimestamp = Math.floor(now - upperTimestamp * 4294967.296);
554+
if (lowerWAL === 4294967295) {
555+
upperWAL = upperWAL + 1;
556+
lowerWAL = 0;
557+
} else {
558+
lowerWAL = lowerWAL + 1;
559+
}
560+
const response = Buffer.alloc(34);
561+
response.fill(0x72); // 'r'
562+
response.writeUInt32BE(upperWAL, 1);
563+
response.writeUInt32BE(lowerWAL, 5);
564+
response.writeUInt32BE(upperWAL, 9);
565+
response.writeUInt32BE(lowerWAL, 13);
566+
response.writeUInt32BE(upperWAL, 17);
567+
response.writeUInt32BE(lowerWAL, 21);
568+
response.writeUInt32BE(upperTimestamp, 25);
569+
response.writeUInt32BE(lowerTimestamp, 29);
570+
response.writeInt8(0, 33);
571+
// @ts-ignore
572+
this.connection.sendCopyFromChunk(response);
573+
this.lastAckTimestamp = Date.now();
574+
return true;
575+
});
554576
}
555577

556578
async #acquireLeaderLock(): Promise<boolean> {

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)