diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 75980947b5..ae833240b5 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -232,6 +232,8 @@ const EnvironmentSchema = z.object({ MARQS_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75), MARQS_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3), MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25), + MARQS_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), + MARQS_MAXIMUM_ORG_COUNT: z.coerce.number().int().optional(), PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(), diff --git a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts index 4a3cd066eb..5b0769e50f 100644 --- a/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts +++ b/apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts @@ -44,6 +44,8 @@ export type FairDequeuingStrategyOptions = { * If not provided, no biasing will be applied (completely random shuffling) */ biases?: FairDequeuingStrategyBiases; + reuseSnapshotCount?: number; + maximumOrgCount?: number; }; type FairQueueConcurrency = { @@ -90,6 +92,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { }>; private _rng: seedrandom.PRNG; + private _reusedSnapshotForConsumer: Map< + string, + { snapshot: FairQueueSnapshot; reuseCount: number } + > = new Map(); constructor(private options: FairDequeuingStrategyOptions) { const ctx = new DefaultStatefulContext(); @@ -310,9 +316,34 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { span.setAttribute("consumer_id", consumerId); span.setAttribute("parent_queue", parentQueue); + if ( + typeof this.options.reuseSnapshotCount === "number" && + this.options.reuseSnapshotCount > 0 + ) { + const key = `${parentQueue}:${consumerId}`; + const reusedSnapshot = this._reusedSnapshotForConsumer.get(key); + + if (reusedSnapshot) { + if (reusedSnapshot.reuseCount < this.options.reuseSnapshotCount) { + span.setAttribute("reused_snapshot", true); + + this._reusedSnapshotForConsumer.set(key, { + snapshot: reusedSnapshot.snapshot, + reuseCount: reusedSnapshot.reuseCount + 1, + }); + + return reusedSnapshot.snapshot; + } else { + this._reusedSnapshotForConsumer.delete(key); + } + } + } + + span.setAttribute("reused_snapshot", false); + const now = Date.now(); - const queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now); + let queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now); span.setAttribute("parent_queue_count", queues.length); @@ -320,6 +351,18 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { return emptyFairQueueSnapshot; } + // Apply org selection if maximumOrgCount is specified + let selectedOrgIds: Set; + if (this.options.maximumOrgCount && this.options.maximumOrgCount > 0) { + selectedOrgIds = this.#selectTopOrgs(queues, this.options.maximumOrgCount); + // Filter queues to only include selected orgs + queues = queues.filter((queue) => selectedOrgIds.has(queue.org)); + + span.setAttribute("selected_org_count", selectedOrgIds.size); + } + + span.setAttribute("selected_queue_count", queues.length); + const orgIds = new Set(); const envIds = new Set(); const envIdToOrgId = new Map(); @@ -341,10 +384,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { (org) => org.concurrency.current >= org.concurrency.limit ); - span.setAttributes({ - ...flattenAttributes(orgsAtFullConcurrency, "orgs_at_full_concurrency"), - }); - const orgIdsAtFullConcurrency = new Set(orgsAtFullConcurrency.map((org) => org.id)); const orgsSnapshot = orgs.reduce((acc, org) => { @@ -355,6 +394,12 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { return acc; }, {} as Record); + span.setAttributes({ + org_count: orgs.length, + orgs_at_full_concurrency_count: orgsAtFullConcurrency.length, + orgs_snapshot_count: Object.keys(orgsSnapshot).length, + }); + if (Object.keys(orgsSnapshot).length === 0) { return emptyFairQueueSnapshot; } @@ -376,10 +421,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { (env) => env.concurrency.current >= env.concurrency.limit ); - span.setAttributes({ - ...flattenAttributes(envsAtFullConcurrency, "envs_at_full_concurrency"), - }); - const envIdsAtFullConcurrency = new Set(envsAtFullConcurrency.map((env) => env.id)); const envsSnapshot = envs.reduce((acc, env) => { @@ -390,6 +431,11 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { return acc; }, {} as Record); + span.setAttributes({ + env_count: envs.length, + envs_at_full_concurrency_count: envsAtFullConcurrency.length, + }); + const queuesSnapshot = queues.filter( (queue) => !orgIdsAtFullConcurrency.has(queue.org) && !envIdsAtFullConcurrency.has(queue.env) @@ -402,10 +448,66 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy { queues: queuesSnapshot, }; + if ( + typeof this.options.reuseSnapshotCount === "number" && + this.options.reuseSnapshotCount > 0 + ) { + this._reusedSnapshotForConsumer.set(`${parentQueue}:${consumerId}`, { + snapshot, + reuseCount: 0, + }); + } + return snapshot; }); } + #selectTopOrgs(queues: FairQueue[], maximumOrgCount: number): Set { + // Group queues by org + const queuesByOrg = queues.reduce((acc, queue) => { + if (!acc[queue.org]) { + acc[queue.org] = []; + } + acc[queue.org].push(queue); + return acc; + }, {} as Record); + + // Calculate average age for each org + const orgAverageAges = Object.entries(queuesByOrg).map(([orgId, orgQueues]) => { + const averageAge = orgQueues.reduce((sum, q) => sum + q.age, 0) / orgQueues.length; + return { orgId, averageAge }; + }); + + // Perform weighted shuffle based on average ages + const maxAge = Math.max(...orgAverageAges.map((o) => o.averageAge)); + const weightedOrgs = orgAverageAges.map((org) => ({ + orgId: org.orgId, + weight: org.averageAge / maxAge, // Normalize weights + })); + + // Select top N orgs using weighted shuffle + const selectedOrgs = new Set(); + let remainingOrgs = [...weightedOrgs]; + let totalWeight = remainingOrgs.reduce((sum, org) => sum + org.weight, 0); + + while (selectedOrgs.size < maximumOrgCount && remainingOrgs.length > 0) { + let random = this._rng() * totalWeight; + let index = 0; + + while (random > 0 && index < remainingOrgs.length) { + random -= remainingOrgs[index].weight; + index++; + } + index = Math.max(0, index - 1); + + selectedOrgs.add(remainingOrgs[index].orgId); + totalWeight -= remainingOrgs[index].weight; + remainingOrgs.splice(index, 1); + } + + return selectedOrgs; + } + async #getOrgConcurrency(orgId: string): Promise { return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => { span.setAttribute("org_id", orgId); diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 0fd91fa8f3..7b151f9692 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -1625,6 +1625,8 @@ function getMarQSClient() { availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS, queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS, }, + reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT, + maximumOrgCount: env.MARQS_MAXIMUM_ORG_COUNT, }), envQueuePriorityStrategy: new FairDequeuingStrategy({ tracer: tracer, diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 3d77595194..0ab0675a7e 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -1122,15 +1122,6 @@ export class SharedQueueConsumer { "emitResumeAfterDependencyWithAck", async (span) => { try { - const sockets = await this.#startActiveSpan("getCoordinatorSockets", async (span) => { - const sockets = await socketIo.coordinatorNamespace.fetchSockets(); - - span.setAttribute("socket_count", sockets.length); - - return sockets; - }); - - span.setAttribute("socket_count", sockets.length); span.setAttribute("attempt_id", resumableAttempt.id); span.setAttribute( "timeout_in_ms", diff --git a/apps/webapp/test/fairDequeuingStrategy.test.ts b/apps/webapp/test/fairDequeuingStrategy.test.ts index 466c466d0a..df851377bc 100644 --- a/apps/webapp/test/fairDequeuingStrategy.test.ts +++ b/apps/webapp/test/fairDequeuingStrategy.test.ts @@ -203,6 +203,96 @@ describe("FairDequeuingStrategy", () => { expect(result).toEqual([queue1, queue2]); }); + redisTest("should reuse snapshots across calls for the same consumer", async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 10, + checkForDisabledOrgs: true, + seed: "test-seed-reuse-1", + reuseSnapshotCount: 1, + }); + + const now = Date.now(); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 3000, + queueId: "queue-1", + orgId: "org-1", + envId: "env-1", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 2000, + queueId: "queue-2", + orgId: "org-2", + envId: "env-2", + }); + + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - 1000, + queueId: "queue-3", + orgId: "org-3", + envId: "env-3", + }); + + const startDistribute1 = performance.now(); + + const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1"); + + const distribute1Duration = performance.now() - startDistribute1; + + console.log("First distribution took", distribute1Duration, "ms"); + + expect(result).toHaveLength(3); + // Should only get the two oldest queues + const queue1 = keyProducer.queueKey("org-1", "env-1", "queue-1"); + const queue2 = keyProducer.queueKey("org-2", "env-2", "queue-2"); + const queue3 = keyProducer.queueKey("org-3", "env-3", "queue-3"); + expect(result).toEqual([queue2, queue1, queue3]); + + const startDistribute2 = performance.now(); + + const result2 = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + const distribute2Duration = performance.now() - startDistribute2; + + console.log("Second distribution took", distribute2Duration, "ms"); + + // Make sure the second call is more than 10 times faster than the first + expect(distribute2Duration).toBeLessThan(distribute1Duration / 10); + + const startDistribute3 = performance.now(); + + const result3 = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + "consumer-1" + ); + + const distribute3Duration = performance.now() - startDistribute3; + + console.log("Third distribution took", distribute3Duration, "ms"); + + // Make sure the third call is more than 4 times the second + expect(distribute3Duration).toBeGreaterThan(distribute2Duration * 4); + }); + redisTest("should fairly distribute queues across environments over time", async ({ redis }) => { const keyProducer = createKeyProducer("test"); const strategy = new FairDequeuingStrategy({ @@ -689,4 +779,135 @@ describe("FairDequeuingStrategy", () => { expect(mixed["queue-1"][0]).toBeGreaterThan(mixed["queue-3"][0]); // Older preferred expect(mixed["queue-3"][0]).toBeGreaterThan(0); // But newer still gets chances }); + + redisTest( + "should respect maximumOrgCount and select orgs based on queue ages", + async ({ redis }) => { + const keyProducer = createKeyProducer("test"); + const strategy = new FairDequeuingStrategy({ + tracer, + redis, + keys: keyProducer, + defaultOrgConcurrency: 10, + defaultEnvConcurrency: 5, + parentQueueLimit: 100, + checkForDisabledOrgs: true, + seed: "test-seed-max-orgs", + maximumOrgCount: 2, // Only select top 2 orgs + }); + + const now = Date.now(); + + // Setup 4 orgs with different queue age profiles + const orgSetups = [ + { + orgId: "org-1", + queues: [ + { age: 1000 }, // Average age: 1000 + ], + }, + { + orgId: "org-2", + queues: [ + { age: 5000 }, // Average age: 5000 + { age: 5000 }, + ], + }, + { + orgId: "org-3", + queues: [ + { age: 2000 }, // Average age: 2000 + { age: 2000 }, + ], + }, + { + orgId: "org-4", + queues: [ + { age: 500 }, // Average age: 500 + { age: 500 }, + ], + }, + ]; + + // Setup queues and concurrency for each org + for (const setup of orgSetups) { + await setupConcurrency({ + redis, + keyProducer, + org: { id: setup.orgId, currentConcurrency: 0, limit: 10 }, + env: { id: "env-1", currentConcurrency: 0, limit: 5 }, + }); + + for (let i = 0; i < setup.queues.length; i++) { + await setupQueue({ + redis, + keyProducer, + parentQueue: "parent-queue", + score: now - setup.queues[i].age, + queueId: `queue-${setup.orgId}-${i}`, + orgId: setup.orgId, + envId: "env-1", + }); + } + } + + // Run multiple iterations to verify consistent behavior + const iterations = 100; + const selectedOrgCounts: Record = {}; + + for (let i = 0; i < iterations; i++) { + const result = await strategy.distributeFairQueuesFromParentQueue( + "parent-queue", + `consumer-${i}` + ); + + // Track which orgs were included in the result + const selectedOrgs = new Set(result.map((queueId) => keyProducer.orgIdFromQueue(queueId))); + + // Verify we never get more than maximumOrgCount orgs + expect(selectedOrgs.size).toBeLessThanOrEqual(2); + + for (const orgId of selectedOrgs) { + selectedOrgCounts[orgId] = (selectedOrgCounts[orgId] || 0) + 1; + } + } + + console.log("Organization selection counts:", selectedOrgCounts); + + // org-2 should be selected most often (highest average age) + expect(selectedOrgCounts["org-2"]).toBeGreaterThan(selectedOrgCounts["org-4"] || 0); + + // org-4 should be selected least often (lowest average age) + const org4Count = selectedOrgCounts["org-4"] || 0; + expect(org4Count).toBeLessThan(selectedOrgCounts["org-2"]); + + // Verify that orgs with higher average queue age are selected more frequently + const sortedOrgs = Object.entries(selectedOrgCounts).sort((a, b) => b[1] - a[1]); + console.log("Sorted organization frequencies:", sortedOrgs); + + // The top 2 most frequently selected orgs should be org-2 and org-3 + // as they have the highest average queue ages + const topTwoOrgs = new Set([sortedOrgs[0][0], sortedOrgs[1][0]]); + expect(topTwoOrgs).toContain("org-2"); // Highest average age + expect(topTwoOrgs).toContain("org-3"); // Second highest average age + + // Calculate selection percentages + const totalSelections = Object.values(selectedOrgCounts).reduce((a, b) => a + b, 0); + const selectionPercentages = Object.entries(selectedOrgCounts).reduce( + (acc, [orgId, count]) => { + acc[orgId] = (count / totalSelections) * 100; + return acc; + }, + {} as Record + ); + + console.log("Organization selection percentages:", selectionPercentages); + + // Verify that org-2 (highest average age) gets selected in at least 40% of iterations + expect(selectionPercentages["org-2"]).toBeGreaterThan(40); + + // Verify that org-4 (lowest average age) gets selected in less than 20% of iterations + expect(selectionPercentages["org-4"] || 0).toBeLessThan(20); + } + ); });