Skip to content

Commit 937d0f7

Browse files
committed
Automatically retry TriggerTaskService when hitting a unique constraint error on idempotency key
1 parent de31220 commit 937d0f7

File tree

7 files changed

+80
-41
lines changed

7 files changed

+80
-41
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute(
7474

7575
const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);
7676

77-
const run = await service.call(params.taskId, authentication.environment, body, {
77+
const result = await service.call(params.taskId, authentication.environment, body, {
7878
idempotencyKey: idempotencyKey ?? undefined,
7979
idempotencyKeyExpiresAt: idempotencyKeyExpiresAt,
8080
triggerVersion: triggerVersion ?? undefined,
@@ -83,19 +83,20 @@ const { action, loader } = createActionApiRoute(
8383
oneTimeUseToken,
8484
});
8585

86-
if (!run) {
86+
if (!result) {
8787
return json({ error: "Task not found" }, { status: 404 });
8888
}
8989

9090
const $responseHeaders = await responseHeaders(
91-
run,
91+
result.run,
9292
authentication.environment,
9393
triggerClient
9494
);
9595

9696
return json(
9797
{
98-
id: run.friendlyId,
98+
id: result.run.friendlyId,
99+
isCached: result.isCached,
99100
},
100101
{
101102
headers: $responseHeaders,

apps/webapp/app/v3/services/batchTriggerTask.server.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export class BatchTriggerTaskService extends BaseService {
104104

105105
for (const item of body.items) {
106106
try {
107-
const run = await triggerTaskService.call(
107+
const result = await triggerTaskService.call(
108108
taskId,
109109
environment,
110110
{
@@ -123,16 +123,16 @@ export class BatchTriggerTaskService extends BaseService {
123123
}
124124
);
125125

126-
if (run) {
126+
if (result) {
127127
await this._prisma.batchTaskRunItem.create({
128128
data: {
129129
batchTaskRunId: batch.id,
130-
taskRunId: run.id,
131-
status: batchTaskRunItemStatusForRunStatus(run.status),
130+
taskRunId: result.run.id,
131+
status: batchTaskRunItemStatusForRunStatus(result.run.status),
132132
},
133133
});
134134

135-
runs.push(run.friendlyId);
135+
runs.push(result.run.friendlyId);
136136
}
137137

138138
index++;

apps/webapp/app/v3/services/batchTriggerV2.server.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,10 @@ export class BatchTriggerV2Service extends BaseService {
7272
environment,
7373
async (span) => {
7474
const existingBatch = options.idempotencyKey
75-
? await this._prisma.batchTaskRun.findUnique({
75+
? await this._prisma.batchTaskRun.findFirst({
7676
where: {
77-
runtimeEnvironmentId_idempotencyKey: {
78-
runtimeEnvironmentId: environment.id,
79-
idempotencyKey: options.idempotencyKey,
80-
},
77+
runtimeEnvironmentId: environment.id,
78+
idempotencyKey: options.idempotencyKey,
8179
},
8280
})
8381
: undefined;
@@ -769,7 +767,7 @@ export class BatchTriggerV2Service extends BaseService {
769767

770768
const triggerTaskService = new TriggerTaskService();
771769

772-
const run = await triggerTaskService.call(
770+
const result = await triggerTaskService.call(
773771
task.item.task,
774772
environment,
775773
{
@@ -790,15 +788,15 @@ export class BatchTriggerV2Service extends BaseService {
790788
}
791789
);
792790

793-
if (!run) {
791+
if (!result) {
794792
throw new Error(`Failed to trigger run ${task.runId} for batch ${batch.friendlyId}`);
795793
}
796794

797795
await this._prisma.batchTaskRunItem.create({
798796
data: {
799797
batchTaskRunId: batch.id,
800-
taskRunId: run.id,
801-
status: batchTaskRunItemStatusForRunStatus(run.status),
798+
taskRunId: result.run.id,
799+
status: batchTaskRunItemStatusForRunStatus(result.run.status),
802800
},
803801
});
804802
}

apps/webapp/app/v3/services/replayTaskRun.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ export class ReplayTaskRunService extends BaseService {
8787
});
8888

8989
const triggerTaskService = new TriggerTaskService();
90-
return await triggerTaskService.call(
90+
const result = await triggerTaskService.call(
9191
existingTaskRun.taskIdentifier,
9292
authenticatedEnvironment,
9393
{
@@ -113,6 +113,8 @@ export class ReplayTaskRunService extends BaseService {
113113
},
114114
}
115115
);
116+
117+
return result?.run;
116118
} catch (error) {
117119
if (error instanceof OutOfEntitlementError) {
118120
return;

apps/webapp/app/v3/services/testTask.server.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,19 @@ export class TestTaskService extends BaseService {
1515

1616
switch (data.triggerSource) {
1717
case "STANDARD":
18-
return await triggerTaskService.call(data.taskIdentifier, authenticatedEnvironment, {
19-
payload: data.payload,
20-
options: {
21-
test: true,
22-
metadata: data.metadata,
23-
},
24-
});
18+
const result = await triggerTaskService.call(
19+
data.taskIdentifier,
20+
authenticatedEnvironment,
21+
{
22+
payload: data.payload,
23+
options: {
24+
test: true,
25+
metadata: data.metadata,
26+
},
27+
}
28+
);
29+
30+
return result?.run;
2531
case "SCHEDULED": {
2632
const payload = {
2733
scheduleId: "sched_1234",
@@ -34,7 +40,7 @@ export class TestTaskService extends BaseService {
3440
};
3541
const payloadPacket = await stringifyIO(payload);
3642

37-
return await triggerTaskService.call(
43+
const result = await triggerTaskService.call(
3844
data.taskIdentifier,
3945
authenticatedEnvironment,
4046
{
@@ -43,6 +49,8 @@ export class TestTaskService extends BaseService {
4349
},
4450
{ customIcon: "scheduled" }
4551
);
52+
53+
return result?.run;
4654
}
4755
default:
4856
throw new Error("Invalid trigger source");

apps/webapp/app/v3/services/triggerScheduledTask.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,14 @@ export class TriggerScheduledTaskService extends BaseService {
131131
payloadPacket,
132132
});
133133

134-
const run = await triggerTask.call(
134+
const result = await triggerTask.call(
135135
instance.taskSchedule.taskIdentifier,
136136
instance.environment,
137137
{ payload: payloadPacket.data, options: { payloadType: payloadPacket.dataType } },
138138
{ customIcon: "scheduled" }
139139
);
140140

141-
if (!run) {
141+
if (!result) {
142142
logger.error("Failed to trigger task", {
143143
instanceId,
144144
scheduleId: instance.taskSchedule.friendlyId,
@@ -147,7 +147,7 @@ export class TriggerScheduledTaskService extends BaseService {
147147
} else {
148148
await this._prisma.taskRun.update({
149149
where: {
150-
id: run.id,
150+
id: result.run.id,
151151
},
152152
data: {
153153
scheduleId: instance.taskSchedule.id,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
2626
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2727
import { clampMaxDuration } from "../utils/maxDuration";
2828
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
29-
import { Prisma } from "@trigger.dev/database";
29+
import { Prisma, TaskRun } from "@trigger.dev/database";
3030
import { sanitizeQueueName } from "~/models/taskQueue.server";
3131

3232
export type TriggerTaskServiceOptions = {
@@ -49,15 +49,30 @@ export class OutOfEntitlementError extends Error {
4949
}
5050
}
5151

52+
export type TriggerTaskServiceResult = {
53+
run: TaskRun;
54+
isCached: boolean;
55+
};
56+
57+
const MAX_ATTEMPTS = 2;
58+
5259
export class TriggerTaskService extends BaseService {
5360
public async call(
5461
taskId: string,
5562
environment: AuthenticatedEnvironment,
5663
body: TriggerTaskRequestBody,
57-
options: TriggerTaskServiceOptions = {}
58-
) {
64+
options: TriggerTaskServiceOptions = {},
65+
attempt: number = 0
66+
): Promise<TriggerTaskServiceResult | undefined> {
5967
return await this.traceWithEnv("call()", environment, async (span) => {
6068
span.setAttribute("taskId", taskId);
69+
span.setAttribute("attempt", attempt);
70+
71+
if (attempt > MAX_ATTEMPTS) {
72+
throw new ServiceValidationError(
73+
`Failed to trigger ${taskId} after ${MAX_ATTEMPTS} attempts.`
74+
);
75+
}
6176

6277
// TODO: Add idempotency key expiring here
6378
const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey;
@@ -74,13 +89,11 @@ export class TriggerTaskService extends BaseService {
7489
: body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined);
7590

7691
const existingRun = idempotencyKey
77-
? await this._prisma.taskRun.findUnique({
92+
? await this._prisma.taskRun.findFirst({
7893
where: {
79-
runtimeEnvironmentId_taskIdentifier_idempotencyKey: {
80-
runtimeEnvironmentId: environment.id,
81-
idempotencyKey,
82-
taskIdentifier: taskId,
83-
},
94+
runtimeEnvironmentId: environment.id,
95+
idempotencyKey,
96+
taskIdentifier: taskId,
8497
},
8598
})
8699
: undefined;
@@ -103,7 +116,7 @@ export class TriggerTaskService extends BaseService {
103116
} else {
104117
span.setAttribute("runId", existingRun.friendlyId);
105118

106-
return existingRun;
119+
return { run: existingRun, isCached: true };
107120
}
108121
}
109122

@@ -572,7 +585,7 @@ export class TriggerTaskService extends BaseService {
572585
);
573586
}
574587

575-
return run;
588+
return { run, isCached: false };
576589
}
577590
);
578591
} catch (error) {
@@ -608,6 +621,23 @@ export class TriggerTaskService extends BaseService {
608621
throw new Error(
609622
`Failed to trigger ${taskId} as the queue could not be created do to a unique constraint error, please try again.`
610623
);
624+
} else if (
625+
Array.isArray(target) &&
626+
target.length == 3 &&
627+
typeof target[0] === "string" &&
628+
typeof target[1] === "string" &&
629+
typeof target[2] === "string" &&
630+
target[0] == "runtimeEnvironmentId" &&
631+
target[1] == "taskIdentifier" &&
632+
target[2] == "idempotencyKey"
633+
) {
634+
logger.debug("TriggerTask: Idempotency key violation, retrying...", {
635+
taskId,
636+
environmentId: environment.id,
637+
idempotencyKey,
638+
});
639+
// We need to retry the task run creation as the idempotency key has been used
640+
return await this.call(taskId, environment, body, options, attempt + 1);
611641
} else {
612642
throw new ServiceValidationError(
613643
`Cannot trigger ${taskId} as it has already been triggered with the same idempotency key.`

0 commit comments

Comments
 (0)