Skip to content

Commit 9882d66

Browse files
authored
v3: pre-pull deployments for faster startups from the first run (#1236)
* pre-pull deployed images on all workers * automatically clean up pre-pull resources * rename pre-pull image to deployment * add changeset
1 parent e27d5cd commit 9882d66

File tree

6 files changed

+225
-6
lines changed

6 files changed

+225
-6
lines changed

.changeset/brown-boats-bathe.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core-apps": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Pre-pull deployment images for faster startups

apps/kubernetes-provider/src/index.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
TaskOperations,
55
TaskOperationsCreateOptions,
66
TaskOperationsIndexOptions,
7+
TaskOperationsPrePullDeploymentOptions,
78
TaskOperationsRestoreOptions,
89
} from "@trigger.dev/core-apps/provider";
910
import { SimpleLogger } from "@trigger.dev/core-apps/logger";
@@ -49,6 +50,7 @@ class KubernetesTaskOperations implements TaskOperations {
4950
#k8sApi: {
5051
core: k8s.CoreV1Api;
5152
batch: k8s.BatchV1Api;
53+
apps: k8s.AppsV1Api;
5254
};
5355

5456
constructor(namespace = "default") {
@@ -313,6 +315,72 @@ class KubernetesTaskOperations implements TaskOperations {
313315
await this.#getPod(opts.runId, this.#namespace);
314316
}
315317

318+
async prePullDeployment(opts: TaskOperationsPrePullDeploymentOptions) {
319+
const metaName = this.#getPrePullContainerName(opts.shortCode);
320+
321+
const metaLabels = {
322+
...this.#getSharedLabels(opts),
323+
app: "task-prepull",
324+
"app.kubernetes.io/part-of": "trigger-worker",
325+
"app.kubernetes.io/component": "prepull",
326+
deployment: opts.deploymentId,
327+
name: metaName,
328+
} satisfies k8s.V1ObjectMeta["labels"];
329+
330+
await this.#createDaemonSet(
331+
{
332+
metadata: {
333+
name: metaName,
334+
namespace: this.#namespace.metadata.name,
335+
labels: metaLabels,
336+
},
337+
spec: {
338+
selector: {
339+
matchLabels: {
340+
name: metaName,
341+
},
342+
},
343+
template: {
344+
metadata: {
345+
labels: metaLabels,
346+
},
347+
spec: {
348+
...this.#defaultPodSpec,
349+
restartPolicy: "Always",
350+
initContainers: [
351+
{
352+
name: "prepull",
353+
image: opts.imageRef,
354+
command: ["/usr/bin/true"],
355+
resources: {
356+
limits: {
357+
cpu: "0.25",
358+
memory: "100Mi",
359+
"ephemeral-storage": "1Gi",
360+
},
361+
},
362+
},
363+
],
364+
containers: [
365+
{
366+
name: "pause",
367+
image: "registry.k8s.io/pause:3.9",
368+
resources: {
369+
limits: {
370+
cpu: "1m",
371+
memory: "12Mi",
372+
},
373+
},
374+
},
375+
],
376+
},
377+
},
378+
},
379+
},
380+
this.#namespace
381+
);
382+
}
383+
316384
#envTypeToLabelValue(type: EnvironmentType) {
317385
switch (type) {
318386
case "PRODUCTION":
@@ -402,7 +470,11 @@ class KubernetesTaskOperations implements TaskOperations {
402470
}
403471

404472
#getSharedLabels(
405-
opts: TaskOperationsIndexOptions | TaskOperationsCreateOptions | TaskOperationsRestoreOptions
473+
opts:
474+
| TaskOperationsIndexOptions
475+
| TaskOperationsCreateOptions
476+
| TaskOperationsRestoreOptions
477+
| TaskOperationsPrePullDeploymentOptions
406478
): Record<string, string> {
407479
return {
408480
env: opts.envId,
@@ -446,6 +518,10 @@ class KubernetesTaskOperations implements TaskOperations {
446518
return `task-run-${suffix}`;
447519
}
448520

521+
#getPrePullContainerName(suffix: string) {
522+
return `task-prepull-${suffix}`;
523+
}
524+
449525
#createK8sApi() {
450526
const kubeConfig = new k8s.KubeConfig();
451527

@@ -460,6 +536,7 @@ class KubernetesTaskOperations implements TaskOperations {
460536
return {
461537
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
462538
batch: kubeConfig.makeApiClient(k8s.BatchV1Api),
539+
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
463540
};
464541
}
465542

@@ -503,6 +580,18 @@ class KubernetesTaskOperations implements TaskOperations {
503580
}
504581
}
505582

583+
async #createDaemonSet(daemonSet: k8s.V1DaemonSet, namespace: Namespace) {
584+
try {
585+
const res = await this.#k8sApi.apps.createNamespacedDaemonSet(
586+
namespace.metadata.name,
587+
daemonSet
588+
);
589+
logger.debug(res.body);
590+
} catch (err: unknown) {
591+
this.#handleK8sError(err);
592+
}
593+
}
594+
506595
#throwUnlessRecord(candidate: unknown): asserts candidate is Record<string, unknown> {
507596
if (typeof candidate !== "object" || candidate === null) {
508597
throw candidate;

apps/kubernetes-provider/src/podCleaner.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ export class PodCleaner {
1515
private logger = new SimpleLogger("[PodCleaner]");
1616
private k8sClient: {
1717
core: k8s.CoreV1Api;
18+
apps: k8s.AppsV1Api;
1819
kubeConfig: k8s.KubeConfig;
1920
};
2021

@@ -43,6 +44,7 @@ export class PodCleaner {
4344

4445
return {
4546
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
47+
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
4648
kubeConfig: kubeConfig,
4749
};
4850
}
@@ -98,6 +100,25 @@ export class PodCleaner {
98100
.catch(this.#handleK8sError.bind(this));
99101
}
100102

103+
async #deleteDaemonSets(opts: {
104+
namespace: string;
105+
dryRun?: boolean;
106+
fieldSelector?: string;
107+
labelSelector?: string;
108+
}) {
109+
return await this.k8sClient.apps
110+
.deleteCollectionNamespacedDaemonSet(
111+
opts.namespace,
112+
undefined, // pretty
113+
undefined, // continue
114+
opts.dryRun ? "All" : undefined,
115+
opts.fieldSelector,
116+
undefined, // gracePeriodSeconds
117+
opts.labelSelector
118+
)
119+
.catch(this.#handleK8sError.bind(this));
120+
}
121+
101122
async #deleteCompletedRuns() {
102123
this.logger.log("Deleting completed runs");
103124

@@ -152,6 +173,28 @@ export class PodCleaner {
152173
});
153174
}
154175

176+
async #deleteCompletedPrePulls() {
177+
this.logger.log("Deleting completed pre-pulls");
178+
179+
const start = Date.now();
180+
181+
const result = await this.#deleteDaemonSets({
182+
namespace: this.namespace,
183+
labelSelector: "app=task-prepull",
184+
});
185+
186+
const elapsedMs = Date.now() - start;
187+
188+
if (!result) {
189+
this.logger.log("Deleting completed pre-pulls: No delete result", { elapsedMs });
190+
return;
191+
}
192+
193+
const total = (result.response as any)?.body?.items?.length ?? 0;
194+
195+
this.logger.log("Deleting completed pre-pulls: Done", { total, elapsedMs });
196+
}
197+
155198
async start() {
156199
this.enabled = true;
157200
this.logger.log("Starting");
@@ -186,6 +229,22 @@ export class PodCleaner {
186229
2 * this.intervalInSeconds * 1000
187230
);
188231

232+
const completedPrePullInterval = setInterval(
233+
async () => {
234+
if (!this.enabled) {
235+
clearInterval(completedPrePullInterval);
236+
return;
237+
}
238+
239+
try {
240+
await this.#deleteCompletedPrePulls();
241+
} catch (error) {
242+
this.logger.error("Error deleting completed pre-pulls", error);
243+
}
244+
},
245+
2 * this.intervalInSeconds * 1000
246+
);
247+
189248
// this.#launchTests();
190249
}
191250

apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { logger } from "~/services/logger.server";
1111
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
1212
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
1313
import { TimeoutDeploymentService } from "./timeoutDeployment.server";
14+
import { socketIo } from "../handleSocketIo.server";
1415

1516
export class CreateDeployedBackgroundWorkerService extends BaseService {
1617
public async call(
@@ -132,6 +133,20 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
132133
logger.error("Failed to publish WORKER_CREATED event", { err });
133134
}
134135

136+
if (deployment.imageReference) {
137+
socketIo.providerNamespace.emit("PRE_PULL_DEPLOYMENT", {
138+
version: "v1",
139+
imageRef: deployment.imageReference,
140+
shortCode: deployment.shortCode,
141+
// identifiers
142+
deploymentId: deployment.id,
143+
envId: environment.id,
144+
envType: environment.type,
145+
orgId: environment.organizationId,
146+
projectId: deployment.projectId,
147+
});
148+
}
149+
135150
await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);
136151
await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma);
137152
await TimeoutDeploymentService.dequeue(deployment.id, this._prisma);

packages/core-apps/src/provider.ts

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ export interface TaskOperationsRestoreOptions {
6464
checkpointId: string;
6565
}
6666

67+
export interface TaskOperationsPrePullDeploymentOptions {
68+
shortCode: string;
69+
imageRef: string;
70+
// identifiers
71+
envId: string;
72+
envType: EnvironmentType;
73+
orgId: string;
74+
projectId: string;
75+
deploymentId: string;
76+
}
77+
6778
export interface TaskOperations {
6879
init: () => Promise<any>;
6980

@@ -73,8 +84,10 @@ export interface TaskOperations {
7384
restore: (opts: TaskOperationsRestoreOptions) => Promise<any>;
7485

7586
// unimplemented
76-
delete: (...args: any[]) => Promise<any>;
77-
get: (...args: any[]) => Promise<any>;
87+
delete?: (...args: any[]) => Promise<any>;
88+
get?: (...args: any[]) => Promise<any>;
89+
90+
prePullDeployment?: (opts: TaskOperationsPrePullDeploymentOptions) => Promise<any>;
7891
}
7992

8093
type ProviderShellOptions = {
@@ -277,6 +290,27 @@ export class ProviderShell implements Provider {
277290
logger.error("restore failed", error);
278291
}
279292
},
293+
PRE_PULL_DEPLOYMENT: async (message) => {
294+
if (!this.tasks.prePullDeployment) {
295+
logger.debug("prePullDeployment not implemented", message);
296+
return;
297+
}
298+
299+
try {
300+
await this.tasks.prePullDeployment({
301+
shortCode: message.shortCode,
302+
imageRef: message.imageRef,
303+
// identifiers
304+
envId: message.envId,
305+
envType: message.envType,
306+
orgId: message.orgId,
307+
projectId: message.projectId,
308+
deploymentId: message.deploymentId,
309+
});
310+
} catch (error) {
311+
logger.error("prePullDeployment failed", error);
312+
}
313+
},
280314
},
281315
});
282316

@@ -306,9 +340,12 @@ export class ProviderShell implements Provider {
306340
case "/delete": {
307341
const body = await getTextBody(req);
308342

309-
await this.tasks.delete({ runId: body });
310-
311-
return reply.text(`sent delete request: ${body}`);
343+
if (this.tasks.delete) {
344+
await this.tasks.delete({ runId: body });
345+
return reply.text(`sent delete request: ${body}`);
346+
} else {
347+
return reply.text("delete not implemented", 501);
348+
}
312349
}
313350
default: {
314351
return reply.empty(404);

packages/core/src/v3/schemas/messages.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,19 @@ export const PlatformToProviderMessages = {
367367
runId: z.string(),
368368
}),
369369
},
370+
PRE_PULL_DEPLOYMENT: {
371+
message: z.object({
372+
version: z.literal("v1").default("v1"),
373+
imageRef: z.string(),
374+
shortCode: z.string(),
375+
// identifiers
376+
envId: z.string(),
377+
envType: EnvironmentType,
378+
orgId: z.string(),
379+
projectId: z.string(),
380+
deploymentId: z.string(),
381+
}),
382+
},
370383
};
371384

372385
const CreateWorkerMessage = z.object({

0 commit comments

Comments
 (0)