Skip to content

Commit 7bc2ded

Browse files
committed
Added action stats, audited all actions and improved reasons
1 parent 754a014 commit 7bc2ded

File tree

1 file changed

+47
-25
lines changed

1 file changed

+47
-25
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ import {
5858
isFinalAttemptStatus,
5959
isFinalRunStatus,
6060
} from "../taskStatus";
61-
import { SEMINTATTRS_FORCE_RECORDING, tracer } from "../tracer.server";
61+
import { tracer } from "../tracer.server";
6262
import { getMaxDuration } from "../utils/maxDuration";
6363
import { MessagePayload } from "./types";
6464

@@ -120,25 +120,26 @@ export type SharedQueueConsumerOptions = {
120120
interval?: number;
121121
};
122122

123+
type HandleMessageAction = "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop";
124+
123125
type DoWorkInternalResult =
124126
| {
125127
reason: string;
126128
attrs?: Record<string, string | number | boolean | undefined>;
127129
error?: Error | string;
128130
interval?: number;
131+
action?: HandleMessageAction;
129132
}
130133
| undefined;
131134

132-
type HandleMessageResult =
133-
| {
134-
action: "ack_and_do_more_work" | "nack" | "nack_and_do_more_work" | "noop";
135-
interval?: number;
136-
retryInMs?: number;
137-
reason?: string;
138-
attrs?: Record<string, string | number | boolean | undefined>;
139-
error?: Error | string;
140-
}
141-
| undefined;
135+
type HandleMessageResult = {
136+
action: HandleMessageAction;
137+
interval?: number;
138+
retryInMs?: number;
139+
reason?: string;
140+
attrs?: Record<string, string | number | boolean | undefined>;
141+
error?: Error | string;
142+
};
142143

143144
export class SharedQueueConsumer {
144145
private _backgroundWorkers: Map<string, BackgroundWorkerWithTasks> = new Map();
@@ -149,6 +150,7 @@ export class SharedQueueConsumer {
149150
private _traceStartedAt: Date | undefined;
150151
private _currentSpanContext: Context | undefined;
151152
private _reasonStats: Record<string, number> = {};
153+
private _actionStats: Record<string, number> = {};
152154
private _currentSpan: Span | undefined;
153155
private _endSpanInNextIteration = false;
154156
private _tasks = sharedQueueTasks;
@@ -243,14 +245,19 @@ export class SharedQueueConsumer {
243245
this._perTraceCountdown = this._options.maximumItemsPerTrace;
244246
this._traceStartedAt = new Date();
245247
this._reasonStats = {};
248+
this._actionStats = {};
246249

247250
this.#doWork().finally(() => {});
248251
}
249252

250253
#endCurrentSpan() {
251254
if (this._currentSpan) {
252255
for (const [reason, count] of Object.entries(this._reasonStats)) {
253-
this._currentSpan.setAttribute(`reasons.${reason}`, count);
256+
this._currentSpan.setAttribute(`reasons_${reason}`, count);
257+
}
258+
259+
for (const [action, count] of Object.entries(this._actionStats)) {
260+
this._currentSpan.setAttribute(`actions_${action}`, count);
254261
}
255262

256263
this._currentSpan.end();
@@ -310,6 +317,7 @@ export class SharedQueueConsumer {
310317
this._perTraceCountdown = this._options.maximumItemsPerTrace;
311318
this._traceStartedAt = new Date();
312319
this._reasonStats = {};
320+
this._actionStats = {};
313321
this._iterationsCount = 0;
314322
this._runningDurationInMs = 0;
315323
this._endSpanInNextIteration = false;
@@ -333,6 +341,10 @@ export class SharedQueueConsumer {
333341
if (result) {
334342
this._reasonStats[result.reason] = (this._reasonStats[result.reason] ?? 0) + 1;
335343

344+
if (result.action) {
345+
this._actionStats[result.action] = (this._actionStats[result.action] ?? 0) + 1;
346+
}
347+
336348
span.setAttribute("reason", result.reason);
337349

338350
if (result.attrs) {
@@ -357,6 +369,7 @@ export class SharedQueueConsumer {
357369
span.setAttribute("reason", "no_result");
358370

359371
this._reasonStats["no_result"] = (this._reasonStats["no_result"] ?? 0) + 1;
372+
this._actionStats["no_result"] = (this._actionStats["no_result"] ?? 0) + 1;
360373
}
361374
} catch (error) {
362375
if (error instanceof Error) {
@@ -436,20 +449,14 @@ export class SharedQueueConsumer {
436449

437450
const messageResult = await this.#handleMessage(message, messageBody.data);
438451

439-
if (!messageResult) {
440-
return {
441-
reason: "no_message_result",
442-
attrs: hydrateAttributes({}),
443-
};
444-
}
445-
446452
switch (messageResult.action) {
447453
case "noop": {
448454
return {
449455
reason: messageResult.reason ?? "none_specified",
450456
attrs: hydrateAttributes(messageResult.attrs ?? {}),
451457
error: messageResult.error,
452458
interval: messageResult.interval,
459+
action: "noop",
453460
};
454461
}
455462
case "ack_and_do_more_work": {
@@ -460,6 +467,7 @@ export class SharedQueueConsumer {
460467
attrs: hydrateAttributes(messageResult.attrs ?? {}),
461468
error: messageResult.error,
462469
interval: messageResult.interval,
470+
action: "ack_and_do_more_work",
463471
};
464472
}
465473
case "nack_and_do_more_work": {
@@ -470,6 +478,7 @@ export class SharedQueueConsumer {
470478
attrs: hydrateAttributes(messageResult.attrs ?? {}),
471479
error: messageResult.error,
472480
interval: messageResult.interval,
481+
action: "nack_and_do_more_work",
473482
};
474483
}
475484
case "nack": {
@@ -479,6 +488,7 @@ export class SharedQueueConsumer {
479488
reason: messageResult.reason ?? "none_specified",
480489
attrs: hydrateAttributes(messageResult.attrs ?? {}),
481490
error: messageResult.error,
491+
action: "nack",
482492
};
483493
}
484494
}
@@ -609,7 +619,6 @@ export class SharedQueueConsumer {
609619
action: "ack_and_do_more_work",
610620
reason: "missing_image_reference",
611621
attrs: {
612-
run_id: existingTaskRun.id,
613622
deployment_id: deployment.id,
614623
},
615624
};
@@ -710,7 +719,7 @@ export class SharedQueueConsumer {
710719
});
711720

712721
return {
713-
action: "nack_and_do_more_work",
722+
action: "ack_and_do_more_work",
714723
reason: "failed_to_lock_task_run",
715724
attrs: {
716725
run_id: existingTaskRun.id,
@@ -791,11 +800,18 @@ export class SharedQueueConsumer {
791800
attrs: {
792801
run_status: lockedTaskRun.status,
793802
is_retry: isRetry,
803+
checkpoint_event_id: data.checkpointEventId,
794804
},
795805
};
796806
}
797807

798-
return;
808+
return {
809+
action: "noop",
810+
reason: "restored_checkpoint",
811+
attrs: {
812+
checkpoint_event_id: data.checkpointEventId,
813+
},
814+
};
799815
}
800816

801817
if (!worker.supportsLazyAttempts) {
@@ -806,7 +822,7 @@ export class SharedQueueConsumer {
806822
setToExecuting: false,
807823
});
808824
} catch (error) {
809-
logger.error("Failed to create task run attempt for outdate worker", {
825+
logger.error("Failed to create task run attempt for outdated worker", {
810826
error,
811827
taskRun: lockedTaskRun.id,
812828
});
@@ -818,7 +834,7 @@ export class SharedQueueConsumer {
818834

819835
return {
820836
action: "ack_and_do_more_work",
821-
reason: "failed_to_create_attempt",
837+
reason: "failed_to_create_attempt_for_outdated_worker",
822838
attrs: {
823839
message_id: message.messageId,
824840
run_id: lockedTaskRun.id,
@@ -931,6 +947,9 @@ export class SharedQueueConsumer {
931947
return {
932948
action: "noop",
933949
reason: "restored_checkpoint",
950+
attrs: {
951+
checkpoint_event_id: data.checkpointEventId,
952+
},
934953
};
935954
} catch (e) {
936955
return {
@@ -992,7 +1011,7 @@ export class SharedQueueConsumer {
9921011

9931012
return {
9941013
action: "ack_and_do_more_work",
995-
reason: "attempt_not_found",
1014+
reason: "resumable_attempt_not_found",
9961015
attrs: {
9971016
attempt_id: data.resumableAttemptId,
9981017
},
@@ -1215,6 +1234,9 @@ export class SharedQueueConsumer {
12151234
return {
12161235
action: "noop",
12171236
reason: "restored_checkpoint",
1237+
attrs: {
1238+
checkpoint_event_id: data.checkpointEventId,
1239+
},
12181240
};
12191241
} else {
12201242
logger.debug(

0 commit comments

Comments
 (0)