Skip to content

Commit ad4334c

Browse files
committed
Rename batch stuff to v3 so it's not confusing
1 parent e3e4f2c commit ad4334c

File tree

8 files changed

+86
-16
lines changed

8 files changed

+86
-16
lines changed

apps/webapp/app/presenters/v3/BatchListPresenter.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export class BatchListPresenter extends BasePresenter {
100100
status: BatchTaskRunStatus;
101101
createdAt: Date;
102102
updatedAt: Date;
103+
completedAt: Date | null;
103104
runCount: BigInt;
104105
batchVersion: string;
105106
}[]
@@ -111,6 +112,7 @@ export class BatchListPresenter extends BasePresenter {
111112
b.status,
112113
b."createdAt",
113114
b."updatedAt",
115+
b."completedAt",
114116
b."runCount",
115117
b."batchVersion"
116118
FROM
@@ -196,7 +198,11 @@ WHERE
196198
createdAt: batch.createdAt.toISOString(),
197199
updatedAt: batch.updatedAt.toISOString(),
198200
hasFinished,
199-
finishedAt: hasFinished ? batch.updatedAt.toISOString() : undefined,
201+
finishedAt: batch.completedAt
202+
? batch.completedAt.toISOString()
203+
: hasFinished
204+
? batch.updatedAt.toISOString()
205+
: undefined,
200206
status: batch.status,
201207
environment: displayableEnvironment(environment, userId),
202208
runCount: Number(batch.runCount),

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1212
import { ServiceValidationError } from "~/v3/services/baseService.server";
1313
import {
1414
BatchProcessingStrategy,
15-
BatchTriggerV2Service,
16-
} from "~/v3/services/batchTriggerV2.server";
15+
BatchTriggerV3Service,
16+
} from "~/v3/services/batchTriggerV3.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
1818
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1919

@@ -85,7 +85,7 @@ const { action, loader } = createActionApiRoute(
8585
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
8686
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);
8787

88-
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
88+
const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
8989

9090
try {
9191
const batch = await service.call(authentication.environment, body, {

apps/webapp/app/services/worker.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import {
5555
CancelDevSessionRunsServiceOptions,
5656
} from "~/v3/services/cancelDevSessionRuns.server";
5757
import { logger } from "./logger.server";
58-
import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
58+
import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
5959

6060
const workerCatalog = {
6161
indexEndpoint: z.object({
@@ -733,7 +733,7 @@ function getWorkerQueue() {
733733
priority: 0,
734734
maxAttempts: 5,
735735
handler: async (payload, job) => {
736-
const service = new BatchTriggerV2Service(payload.strategy);
736+
const service = new BatchTriggerV3Service(payload.strategy);
737737

738738
await service.processBatchTaskRun(payload);
739739
},

apps/webapp/app/v3/services/batchTriggerV2.server.ts renamed to apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,34 @@ type RunItemData = {
5757
taskIdentifier: string;
5858
};
5959

60-
export class BatchTriggerV2Service extends BaseService {
60+
/**
61+
* ### V3
62+
*
63+
* BatchTrigger v3 doesn't have any changes from v2, other than a different system for tracking if the
64+
* batch is completed.
65+
*
66+
* v3 BatchTaskRun's now must be "sealed" before they could be considered completed. Being "sealed" means
67+
* that all the items in the batch have been processed and the batch is ready to be considered completed.
68+
*
69+
* We also now track the expected count of items in the batch, and then as each BatchTaskRunItem is set to COMPLETED,
70+
* we increment the BatchTaskRun's completed count. Once the completed count is equal to the expected count, and the
71+
* batch is sealed, we can consider the batch completed.
72+
*
73+
* So now when the v3 batch is considered completed, we will enqueue the ResumeBatchRunService to resume the dependent
74+
* task attempt if there is one. This is in contrast to v2 batches where every time a task was completed, we would schedule
75+
* the ResumeBatchRunService to check if the batch was completed and set it to completed if it was.
76+
*
77+
* We've also introduced a new column "resumedAt" that will be set when the batch is resumed. Previously in v2 batches, the status == "COMPLETED" was overloaded
78+
* to mean that the batch was completed and resumed. Now we have a separate column to track when the batch was resumed (and to make sure it's only resumed once).
79+
*
80+
* ### V2
81+
*
82+
* Batch v2 added the ability to trigger more than 100 tasks in a single batch. This was done by offloading the payload to the object store and
83+
* then processing the batch in chunks of 50 tasks at a time in the background.
84+
*
85+
* The other main difference from v1 is that a single batch in v2 could trigger multiple different tasks, whereas in v1 a batch could only trigger a single task.
86+
*/
87+
export class BatchTriggerV3Service extends BaseService {
6188
private _batchProcessingStrategy: BatchProcessingStrategy;
6289

6390
constructor(

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
import { FlushedRunMetadata, sanitizeError, TaskRunError } from "@trigger.dev/core/v3";
22
import { type Prisma, type TaskRun } from "@trigger.dev/database";
3+
import { findQueueInEnvironment } from "~/models/taskQueue.server";
4+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
35
import { logger } from "~/services/logger.server";
6+
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
47
import { marqs } from "~/v3/marqs/index.server";
58
import { generateFriendlyId } from "../friendlyIdentifiers";
9+
import { socketIo } from "../handleSocketIo.server";
610
import {
711
FINAL_ATTEMPT_STATUSES,
812
isFailedRunStatus,
@@ -11,15 +15,10 @@ import {
1115
} from "../taskStatus";
1216
import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server";
1317
import { BaseService } from "./baseService.server";
14-
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
18+
import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server";
1519
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
16-
import { socketIo } from "../handleSocketIo.server";
1720
import { ResumeBatchRunService } from "./resumeBatchRun.server";
18-
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
19-
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
20-
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
21-
import { $transaction } from "~/db.server";
22-
import { completeBatchTaskRunItemV3 } from "./batchTriggerV2.server";
21+
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
2322

2423
type BaseInput = {
2524
id: string;

apps/webapp/app/v3/services/resumeDependentParents.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { BaseService } from "./baseService.server";
55
import { ResumeBatchRunService } from "./resumeBatchRun.server";
66
import { ResumeTaskDependencyService } from "./resumeTaskDependency.server";
77
import { $transaction } from "~/db.server";
8-
import { completeBatchTaskRunItemV3 } from "./batchTriggerV2.server";
8+
import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server";
99

1010
type Output =
1111
| {

apps/webapp/app/v3/services/resumeTaskRunDependencies.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { workerQueue } from "~/services/worker.server";
99
import { BaseService } from "./baseService.server";
1010
import { ResumeBatchRunService } from "./resumeBatchRun.server";
1111
import { ResumeTaskDependencyService } from "./resumeTaskDependency.server";
12-
import { completeBatchTaskRunItemV3 } from "./batchTriggerV2.server";
12+
import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server";
1313

1414
export class ResumeTaskRunDependenciesService extends BaseService {
1515
public async call(attemptId: string) {

references/v3-catalog/src/trigger/batch.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,3 +792,41 @@ export const batchAutoIdempotencyKeyChild = task({
792792
return payload;
793793
},
794794
});
795+
796+
export const batchTriggerIdempotencyKeyTest = task({
797+
id: "batch-trigger-idempotency-key-test",
798+
retry: {
799+
maxAttempts: 1,
800+
},
801+
run: async () => {
802+
// Trigger a batch of 2 items with the same idempotency key
803+
await batchV2TestChild.batchTrigger(
804+
[
805+
{ payload: { foo: "bar" }, options: { idempotencyKey: "test-idempotency-key" } },
806+
{ payload: { foo: "baz" }, options: { idempotencyKey: "test-idempotency-key" } },
807+
],
808+
{
809+
triggerSequentially: true,
810+
}
811+
);
812+
813+
// Now trigger a batch of 21 items, with just the last one having an idempotency key
814+
await batchV2TestChild.batchTrigger(
815+
Array.from({ length: 20 }, (_, i) => ({
816+
payload: { foo: `bar${i}` },
817+
options: {},
818+
})).concat([
819+
{
820+
payload: { foo: "baz" },
821+
options: { idempotencyKey: "test-idempotency-key-2" },
822+
},
823+
]),
824+
{
825+
triggerSequentially: true,
826+
}
827+
);
828+
829+
// Now while that batch is being processed in the background, lets trigger the same task with that idempotency key
830+
await batchV2TestChild.trigger({ foo: "baz" }, { idempotencyKey: "test-idempotency-key-2" });
831+
},
832+
});

0 commit comments

Comments
 (0)