Skip to content

Commit ab69b60

Browse files
authored
New metadata deployed task fix (#1588)
* Add more logging around new metadata system * Make the authenticated env optional when updating run metadata
1 parent d52e3d7 commit ab69b60

File tree

4 files changed

+47
-31
lines changed

4 files changed

+47
-31
lines changed

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const { action } = createActionApiRoute(
1616
method: "PUT",
1717
},
1818
async ({ authentication, body, params }) => {
19-
const result = await updateMetadataService.call(authentication.environment, params.runId, body);
19+
const result = await updateMetadataService.call(params.runId, body, authentication.environment);
2020

2121
if (!result) {
2222
return json({ error: "Task Run not found" }, { status: 404 });

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -229,17 +229,21 @@ export class UpdateMetadataService extends BaseService {
229229
}
230230

231231
public async call(
232-
environment: AuthenticatedEnvironment,
233232
runId: string,
234-
body: UpdateMetadataRequestBody
233+
body: UpdateMetadataRequestBody,
234+
environment?: AuthenticatedEnvironment
235235
) {
236236
const runIdType = runId.startsWith("run_") ? "friendly" : "internal";
237237

238238
const taskRun = await this._prisma.taskRun.findFirst({
239-
where: {
240-
runtimeEnvironmentId: environment.id,
241-
...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }),
242-
},
239+
where: environment
240+
? {
241+
runtimeEnvironmentId: environment.id,
242+
...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }),
243+
}
244+
: {
245+
...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }),
246+
},
243247
select: {
244248
id: true,
245249
status: true,
@@ -351,6 +355,15 @@ export class UpdateMetadataService extends BaseService {
351355
});
352356

353357
if (result.count === 0) {
358+
if (this.flushLoggingEnabled) {
359+
logger.debug(
360+
`[UpdateMetadataService][updateRunMetadataWithOperations] Optimistic lock failed for run ${runId}`,
361+
{
362+
metadataVersion: run.metadataVersion,
363+
}
364+
);
365+
}
366+
354367
// If this was our last attempt, buffer the operations and return optimistically
355368
if (attempts === MAX_RETRIES) {
356369
this.#ingestRunOperations(runId, operations);
@@ -363,6 +376,15 @@ export class UpdateMetadataService extends BaseService {
363376
continue;
364377
}
365378

379+
if (this.flushLoggingEnabled) {
380+
logger.debug(
381+
`[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`,
382+
{
383+
metadata: applyResults.newMetadata,
384+
}
385+
);
386+
}
387+
366388
// Success! Return the new metadata
367389
return applyResults.newMetadata;
368390
}
@@ -383,10 +405,15 @@ export class UpdateMetadataService extends BaseService {
383405
metadataPacket.data !== "{}" ||
384406
(existingMetadata.data && metadataPacket.data !== existingMetadata.data)
385407
) {
386-
logger.debug(`Updating metadata directly for run`, {
387-
metadata: metadataPacket.data,
388-
runId,
389-
});
408+
if (this.flushLoggingEnabled) {
409+
logger.debug(
410+
`[UpdateMetadataService][updateRunMetadataDirectly] Updating metadata directly for run`,
411+
{
412+
metadata: metadataPacket.data,
413+
runId,
414+
}
415+
);
416+
}
390417

391418
// Update the metadata without version check
392419
await this._prisma.taskRun.update({
@@ -416,6 +443,13 @@ export class UpdateMetadataService extends BaseService {
416443
};
417444
});
418445

446+
if (this.flushLoggingEnabled) {
447+
logger.debug(`[UpdateMetadataService] Ingesting operations for run`, {
448+
runId,
449+
bufferedOperations,
450+
});
451+
}
452+
419453
const existingBufferedOperations = this._bufferedOperations.get(runId) ?? [];
420454

421455
this._bufferedOperations.set(runId, [...existingBufferedOperations, ...bufferedOperations]);

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ export class FinalizeTaskRunService extends BaseService {
7070
completedAt,
7171
});
7272

73-
if (env && metadata) {
73+
if (metadata) {
7474
try {
75-
await updateMetadataService.call(env, id, metadata);
75+
await updateMetadataService.call(id, metadata, env);
7676
} catch (e) {
7777
logger.error("[FinalizeTaskRunService] Failed to update metadata", {
7878
taskRun: id,

references/nextjs-realtime/src/trigger/csv.ts

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,6 @@ export const handleCSVUpload = schemaTask({
6262
const successfulRows = results.runs.filter((r) => r.ok);
6363
const failedRows = results.runs.filter((r) => !r.ok);
6464

65-
const firstSuccessfulRow = successfulRows[0];
66-
67-
if (firstSuccessfulRow) {
68-
const stream = await metadata.fetchStream<string>(firstSuccessfulRow.id);
69-
70-
for await (const value of stream) {
71-
logger.info(`Stream value from ${firstSuccessfulRow.id}`, { value });
72-
}
73-
}
74-
7565
return {
7666
file,
7767
rows,
@@ -93,14 +83,6 @@ export const handleCSVRow = schemaTask({
9383

9484
metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id);
9585

96-
await metadata.parent.stream(
97-
ctx.run.id,
98-
(async function* () {
99-
yield "hello";
100-
yield "world";
101-
})()
102-
);
103-
10486
return row;
10587
},
10688
});

0 commit comments

Comments
 (0)