diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index 1ebf43bc3d..ce9222a18a 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -16,7 +16,7 @@ const { action } = createActionApiRoute( method: "PUT", }, async ({ authentication, body, params }) => { - const result = await updateMetadataService.call(authentication.environment, params.runId, body); + const result = await updateMetadataService.call(params.runId, body, authentication.environment); if (!result) { return json({ error: "Task Run not found" }, { status: 404 }); diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index ffab7d4807..47a9a8f5cb 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -229,17 +229,21 @@ export class UpdateMetadataService extends BaseService { } public async call( - environment: AuthenticatedEnvironment, runId: string, - body: UpdateMetadataRequestBody + body: UpdateMetadataRequestBody, + environment?: AuthenticatedEnvironment ) { const runIdType = runId.startsWith("run_") ? "friendly" : "internal"; const taskRun = await this._prisma.taskRun.findFirst({ - where: { - runtimeEnvironmentId: environment.id, - ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), - }, + where: environment + ? { + runtimeEnvironmentId: environment.id, + ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), + } + : { + ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), + }, select: { id: true, status: true, @@ -351,6 +355,15 @@ export class UpdateMetadataService extends BaseService { }); if (result.count === 0) { + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataWithOperations] Optimistic lock failed for run ${runId}`, + { + metadataVersion: run.metadataVersion, + } + ); + } + // If this was our last attempt, buffer the operations and return optimistically if (attempts === MAX_RETRIES) { this.#ingestRunOperations(runId, operations); @@ -363,6 +376,15 @@ export class UpdateMetadataService extends BaseService { continue; } + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`, + { + metadata: applyResults.newMetadata, + } + ); + } + // Success! Return the new metadata return applyResults.newMetadata; } @@ -383,10 +405,15 @@ export class UpdateMetadataService extends BaseService { metadataPacket.data !== "{}" || (existingMetadata.data && metadataPacket.data !== existingMetadata.data) ) { - logger.debug(`Updating metadata directly for run`, { - metadata: metadataPacket.data, - runId, - }); + if (this.flushLoggingEnabled) { + logger.debug( + `[UpdateMetadataService][updateRunMetadataDirectly] Updating metadata directly for run`, + { + metadata: metadataPacket.data, + runId, + } + ); + } // Update the metadata without version check await this._prisma.taskRun.update({ @@ -416,6 +443,13 @@ export class UpdateMetadataService extends BaseService { }; }); + if (this.flushLoggingEnabled) { + logger.debug(`[UpdateMetadataService] Ingesting operations for run`, { + runId, + bufferedOperations, + }); + } + const existingBufferedOperations = this._bufferedOperations.get(runId) ?? []; this._bufferedOperations.set(runId, [...existingBufferedOperations, ...bufferedOperations]); diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index c31486021b..3c87dd5f41 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -70,9 +70,9 @@ export class FinalizeTaskRunService extends BaseService { completedAt, }); - if (env && metadata) { + if (metadata) { try { - await updateMetadataService.call(env, id, metadata); + await updateMetadataService.call(id, metadata, env); } catch (e) { logger.error("[FinalizeTaskRunService] Failed to update metadata", { taskRun: id, diff --git a/references/nextjs-realtime/src/trigger/csv.ts b/references/nextjs-realtime/src/trigger/csv.ts index 3acdddd4d4..50167b484c 100644 --- a/references/nextjs-realtime/src/trigger/csv.ts +++ b/references/nextjs-realtime/src/trigger/csv.ts @@ -62,16 +62,6 @@ export const handleCSVUpload = schemaTask({ const successfulRows = results.runs.filter((r) => r.ok); const failedRows = results.runs.filter((r) => !r.ok); - const firstSuccessfulRow = successfulRows[0]; - - if (firstSuccessfulRow) { - const stream = await metadata.fetchStream(firstSuccessfulRow.id); - - for await (const value of stream) { - logger.info(`Stream value from ${firstSuccessfulRow.id}`, { value }); - } - } - return { file, rows, @@ -93,14 +83,6 @@ export const handleCSVRow = schemaTask({ metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id); - await metadata.parent.stream( - ctx.run.id, - (async function* () { - yield "hello"; - yield "world"; - })() - ); - return row; }, });