Skip to content

Commit ca8b8a7

Browse files
committed
Improve metadata flushing efficiency by collapsing operations (fix #2104)
1 parent e84ede9 commit ca8b8a7

File tree

5 files changed

+935
-11
lines changed

5 files changed

+935
-11
lines changed

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ const { action } = createActionApiRoute(
1212
{
1313
params: ParamsSchema,
1414
body: UpdateMetadataRequestBody,
15-
maxContentLength: 1024 * 1024, // 1MB
15+
maxContentLength: 1024 * 1024 * 2, // 3MB
1616
method: "PUT",
1717
},
1818
async ({ authentication, body, params }) => {

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ export class UpdateMetadataService extends BaseService {
381381
`[UpdateMetadataService][updateRunMetadataWithOperations] Updated metadata for run ${runId}`,
382382
{
383383
metadata: applyResults.newMetadata,
384+
operations: operations,
384385
}
385386
);
386387
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { ApiClient } from "../apiClient/index.js";
44
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
55
import { ApiRequestOptions } from "../zodfetch.js";
66
import { MetadataStream } from "./metadataStream.js";
7-
import { applyMetadataOperations } from "./operations.js";
7+
import { applyMetadataOperations, collapseOperations } from "./operations.js";
88
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
99
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
1010

@@ -33,7 +33,7 @@ export class StandardMetadataManager implements RunMetadataManager {
3333
get parent(): RunMetadataUpdater {
3434
// Store a reference to 'this' to ensure proper context
3535
const self = this;
36-
36+
3737
// Create the updater object and store it in a local variable
3838
const parentUpdater: RunMetadataUpdater = {
3939
set: (key, value) => {
@@ -66,14 +66,14 @@ export class StandardMetadataManager implements RunMetadataManager {
6666
},
6767
stream: (key, value, signal) => self.doStream(key, value, "parent", parentUpdater, signal),
6868
};
69-
69+
7070
return parentUpdater;
7171
}
7272

7373
get root(): RunMetadataUpdater {
7474
// Store a reference to 'this' to ensure proper context
7575
const self = this;
76-
76+
7777
// Create the updater object and store it in a local variable
7878
const rootUpdater: RunMetadataUpdater = {
7979
set: (key, value) => {
@@ -106,7 +106,7 @@ export class StandardMetadataManager implements RunMetadataManager {
106106
},
107107
stream: (key, value, signal) => self.doStream(key, value, "root", rootUpdater, signal),
108108
};
109-
109+
110110
return rootUpdater;
111111
}
112112

@@ -353,9 +353,17 @@ export class StandardMetadataManager implements RunMetadataManager {
353353
this.queuedRootOperations.clear();
354354

355355
try {
356+
const collapsedOperations = collapseOperations(operations);
357+
const collapsedParentOperations = collapseOperations(parentOperations);
358+
const collapsedRootOperations = collapseOperations(rootOperations);
359+
356360
const response = await this.apiClient.updateRunMetadata(
357361
this.runId,
358-
{ operations, parentOperations, rootOperations },
362+
{
363+
operations: collapsedOperations,
364+
parentOperations: collapsedParentOperations,
365+
rootOperations: collapsedRootOperations,
366+
},
359367
requestOptions
360368
);
361369

@@ -406,10 +414,14 @@ export class StandardMetadataManager implements RunMetadataManager {
406414
return;
407415
}
408416

417+
const operations = Array.from(this.queuedOperations);
418+
const parentOperations = Array.from(this.queuedParentOperations);
419+
const rootOperations = Array.from(this.queuedRootOperations);
420+
409421
return {
410-
operations: Array.from(this.queuedOperations),
411-
parentOperations: Array.from(this.queuedParentOperations),
412-
rootOperations: Array.from(this.queuedRootOperations),
422+
operations: collapseOperations(operations),
423+
parentOperations: collapseOperations(parentOperations),
424+
rootOperations: collapseOperations(rootOperations),
413425
};
414426
}
415427

packages/core/src/v3/runMetadata/operations.ts

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,109 @@ export function applyMetadataOperations(
128128

129129
return { newMetadata, unappliedOperations };
130130
}
131+
132+
/**
133+
* Collapses metadata operations to reduce payload size and avoid 413 "Request Entity Too Large" errors.
134+
*
135+
* When there are many operations queued up (e.g., 10k increment operations), sending them all
136+
* individually can result in request payloads exceeding the server's 1MB limit. This function
137+
* intelligently combines operations where possible to reduce the payload size:
138+
*
139+
* - **Increment operations**: Multiple increments on the same key are summed into a single increment
140+
* - Example: increment("counter", 1) + increment("counter", 2) → increment("counter", 3)
141+
*
142+
* - **Set operations**: Multiple sets on the same key keep only the last one (since later sets override earlier ones)
143+
* - Example: set("status", "processing") + set("status", "done") → set("status", "done")
144+
*
145+
* - **Delete operations**: Multiple deletes on the same key keep only one (duplicates are redundant)
146+
* - Example: del("temp") + del("temp") → del("temp")
147+
*
148+
* - **Append, remove, and update operations**: Preserved as-is to maintain correctness since order matters
149+
*
150+
* @param operations Array of metadata change operations to collapse
151+
* @returns Collapsed array with fewer operations that produce the same final result
152+
*
153+
* @example
154+
* ```typescript
155+
* const operations = [
156+
* { type: "increment", key: "counter", value: 1 },
157+
* { type: "increment", key: "counter", value: 2 },
158+
* { type: "set", key: "status", value: "processing" },
159+
* { type: "set", key: "status", value: "done" }
160+
* ];
161+
*
162+
* const collapsed = collapseOperations(operations);
163+
* // Result: [
164+
* // { type: "increment", key: "counter", value: 3 },
165+
* // { type: "set", key: "status", value: "done" }
166+
* // ]
167+
* ```
168+
*/
169+
export function collapseOperations(
170+
operations: RunMetadataChangeOperation[]
171+
): RunMetadataChangeOperation[] {
172+
if (operations.length === 0) {
173+
return operations;
174+
}
175+
176+
// Maps to track collapsible operations
177+
const incrementsByKey = new Map<string, number>();
178+
const setsByKey = new Map<string, RunMetadataChangeOperation>();
179+
const deletesByKey = new Set<string>();
180+
const preservedOperations: RunMetadataChangeOperation[] = [];
181+
182+
// Process operations in order
183+
for (const operation of operations) {
184+
switch (operation.type) {
185+
case "increment":
186+
const currentIncrement = incrementsByKey.get(operation.key) || 0;
187+
incrementsByKey.set(operation.key, currentIncrement + operation.value);
188+
break;
189+
190+
case "set":
191+
// Keep only the last set operation for each key
192+
setsByKey.set(operation.key, operation);
193+
break;
194+
195+
case "delete":
196+
// Keep only one delete operation per key
197+
deletesByKey.add(operation.key);
198+
break;
199+
200+
case "append":
201+
case "remove":
202+
case "update":
203+
// Preserve these operations as-is to maintain correctness
204+
preservedOperations.push(operation);
205+
break;
206+
207+
default:
208+
// Handle any future operation types by preserving them
209+
preservedOperations.push(operation);
210+
break;
211+
}
212+
}
213+
214+
// Build the collapsed operations array
215+
const collapsedOperations: RunMetadataChangeOperation[] = [];
216+
217+
// Add collapsed increment operations
218+
for (const [key, value] of incrementsByKey) {
219+
collapsedOperations.push({ type: "increment", key, value });
220+
}
221+
222+
// Add collapsed set operations
223+
for (const operation of setsByKey.values()) {
224+
collapsedOperations.push(operation);
225+
}
226+
227+
// Add collapsed delete operations
228+
for (const key of deletesByKey) {
229+
collapsedOperations.push({ type: "delete", key });
230+
}
231+
232+
// Add preserved operations
233+
collapsedOperations.push(...preservedOperations);
234+
235+
return collapsedOperations;
236+
}

0 commit comments

Comments
 (0)