Skip to content

Commit 772150c

Browse files
committed
Only consider the top N orgs when dequeuing, to help mitigate large spikes in queues (like around the hour and half hour marks)
1 parent 6a8db74 commit 772150c

File tree

4 files changed

+193
-1
lines changed

4 files changed

+193
-1
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ const EnvironmentSchema = z.object({
233233
MARQS_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3),
234234
MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
235235
MARQS_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
236+
MARQS_MAXIMUM_ORG_COUNT: z.coerce.number().int().optional(),
236237

237238
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
238239

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type FairDequeuingStrategyOptions = {
4545
*/
4646
biases?: FairDequeuingStrategyBiases;
4747
reuseSnapshotCount?: number;
48+
maximumOrgCount?: number;
4849
};
4950

5051
type FairQueueConcurrency = {
@@ -342,14 +343,26 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
342343

343344
const now = Date.now();
344345

345-
const queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now);
346+
let queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now);
346347

347348
span.setAttribute("parent_queue_count", queues.length);
348349

349350
if (queues.length === 0) {
350351
return emptyFairQueueSnapshot;
351352
}
352353

354+
// Apply org selection if maximumOrgCount is specified
355+
let selectedOrgIds: Set<string>;
356+
if (this.options.maximumOrgCount && this.options.maximumOrgCount > 0) {
357+
selectedOrgIds = this.#selectTopOrgs(queues, this.options.maximumOrgCount);
358+
// Filter queues to only include selected orgs
359+
queues = queues.filter((queue) => selectedOrgIds.has(queue.org));
360+
361+
span.setAttribute("selected_org_count", selectedOrgIds.size);
362+
}
363+
364+
span.setAttribute("selected_queue_count", queues.length);
365+
353366
const orgIds = new Set<string>();
354367
const envIds = new Set<string>();
355368
const envIdToOrgId = new Map<string, string>();
@@ -449,6 +462,52 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
449462
});
450463
}
451464

465+
#selectTopOrgs(queues: FairQueue[], maximumOrgCount: number): Set<string> {
466+
// Group queues by org
467+
const queuesByOrg = queues.reduce((acc, queue) => {
468+
if (!acc[queue.org]) {
469+
acc[queue.org] = [];
470+
}
471+
acc[queue.org].push(queue);
472+
return acc;
473+
}, {} as Record<string, FairQueue[]>);
474+
475+
// Calculate average age for each org
476+
const orgAverageAges = Object.entries(queuesByOrg).map(([orgId, orgQueues]) => {
477+
const averageAge = orgQueues.reduce((sum, q) => sum + q.age, 0) / orgQueues.length;
478+
return { orgId, averageAge };
479+
});
480+
481+
// Perform weighted shuffle based on average ages
482+
const maxAge = Math.max(...orgAverageAges.map((o) => o.averageAge));
483+
const weightedOrgs = orgAverageAges.map((org) => ({
484+
orgId: org.orgId,
485+
weight: org.averageAge / maxAge, // Normalize weights
486+
}));
487+
488+
// Select top N orgs using weighted shuffle
489+
const selectedOrgs = new Set<string>();
490+
let remainingOrgs = [...weightedOrgs];
491+
let totalWeight = remainingOrgs.reduce((sum, org) => sum + org.weight, 0);
492+
493+
while (selectedOrgs.size < maximumOrgCount && remainingOrgs.length > 0) {
494+
let random = this._rng() * totalWeight;
495+
let index = 0;
496+
497+
while (random > 0 && index < remainingOrgs.length) {
498+
random -= remainingOrgs[index].weight;
499+
index++;
500+
}
501+
index = Math.max(0, index - 1);
502+
503+
selectedOrgs.add(remainingOrgs[index].orgId);
504+
totalWeight -= remainingOrgs[index].weight;
505+
remainingOrgs.splice(index, 1);
506+
}
507+
508+
return selectedOrgs;
509+
}
510+
452511
async #getOrgConcurrency(orgId: string): Promise<FairQueueConcurrency> {
453512
return await startSpan(this.options.tracer, "getOrgConcurrency", async (span) => {
454513
span.setAttribute("org_id", orgId);

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1626,6 +1626,7 @@ function getMarQSClient() {
16261626
queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS,
16271627
},
16281628
reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT,
1629+
maximumOrgCount: env.MARQS_MAXIMUM_ORG_COUNT,
16291630
}),
16301631
envQueuePriorityStrategy: new FairDequeuingStrategy({
16311632
tracer: tracer,

apps/webapp/test/fairDequeuingStrategy.test.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,4 +779,135 @@ describe("FairDequeuingStrategy", () => {
779779
expect(mixed["queue-1"][0]).toBeGreaterThan(mixed["queue-3"][0]); // Older preferred
780780
expect(mixed["queue-3"][0]).toBeGreaterThan(0); // But newer still gets chances
781781
});
782+
783+
redisTest(
784+
"should respect maximumOrgCount and select orgs based on queue ages",
785+
async ({ redis }) => {
786+
const keyProducer = createKeyProducer("test");
787+
const strategy = new FairDequeuingStrategy({
788+
tracer,
789+
redis,
790+
keys: keyProducer,
791+
defaultOrgConcurrency: 10,
792+
defaultEnvConcurrency: 5,
793+
parentQueueLimit: 100,
794+
checkForDisabledOrgs: true,
795+
seed: "test-seed-max-orgs",
796+
maximumOrgCount: 2, // Only select top 2 orgs
797+
});
798+
799+
const now = Date.now();
800+
801+
// Setup 4 orgs with different queue age profiles
802+
const orgSetups = [
803+
{
804+
orgId: "org-1",
805+
queues: [
806+
{ age: 1000 }, // Average age: 1000
807+
],
808+
},
809+
{
810+
orgId: "org-2",
811+
queues: [
812+
{ age: 5000 }, // Average age: 5000
813+
{ age: 5000 },
814+
],
815+
},
816+
{
817+
orgId: "org-3",
818+
queues: [
819+
{ age: 2000 }, // Average age: 2000
820+
{ age: 2000 },
821+
],
822+
},
823+
{
824+
orgId: "org-4",
825+
queues: [
826+
{ age: 500 }, // Average age: 500
827+
{ age: 500 },
828+
],
829+
},
830+
];
831+
832+
// Setup queues and concurrency for each org
833+
for (const setup of orgSetups) {
834+
await setupConcurrency({
835+
redis,
836+
keyProducer,
837+
org: { id: setup.orgId, currentConcurrency: 0, limit: 10 },
838+
env: { id: "env-1", currentConcurrency: 0, limit: 5 },
839+
});
840+
841+
for (let i = 0; i < setup.queues.length; i++) {
842+
await setupQueue({
843+
redis,
844+
keyProducer,
845+
parentQueue: "parent-queue",
846+
score: now - setup.queues[i].age,
847+
queueId: `queue-${setup.orgId}-${i}`,
848+
orgId: setup.orgId,
849+
envId: "env-1",
850+
});
851+
}
852+
}
853+
854+
// Run multiple iterations to verify consistent behavior
855+
const iterations = 100;
856+
const selectedOrgCounts: Record<string, number> = {};
857+
858+
for (let i = 0; i < iterations; i++) {
859+
const result = await strategy.distributeFairQueuesFromParentQueue(
860+
"parent-queue",
861+
`consumer-${i}`
862+
);
863+
864+
// Track which orgs were included in the result
865+
const selectedOrgs = new Set(result.map((queueId) => keyProducer.orgIdFromQueue(queueId)));
866+
867+
// Verify we never get more than maximumOrgCount orgs
868+
expect(selectedOrgs.size).toBeLessThanOrEqual(2);
869+
870+
for (const orgId of selectedOrgs) {
871+
selectedOrgCounts[orgId] = (selectedOrgCounts[orgId] || 0) + 1;
872+
}
873+
}
874+
875+
console.log("Organization selection counts:", selectedOrgCounts);
876+
877+
// org-2 should be selected most often (highest average age)
878+
expect(selectedOrgCounts["org-2"]).toBeGreaterThan(selectedOrgCounts["org-4"] || 0);
879+
880+
// org-4 should be selected least often (lowest average age)
881+
const org4Count = selectedOrgCounts["org-4"] || 0;
882+
expect(org4Count).toBeLessThan(selectedOrgCounts["org-2"]);
883+
884+
// Verify that orgs with higher average queue age are selected more frequently
885+
const sortedOrgs = Object.entries(selectedOrgCounts).sort((a, b) => b[1] - a[1]);
886+
console.log("Sorted organization frequencies:", sortedOrgs);
887+
888+
// The top 2 most frequently selected orgs should be org-2 and org-3
889+
// as they have the highest average queue ages
890+
const topTwoOrgs = new Set([sortedOrgs[0][0], sortedOrgs[1][0]]);
891+
expect(topTwoOrgs).toContain("org-2"); // Highest average age
892+
expect(topTwoOrgs).toContain("org-3"); // Second highest average age
893+
894+
// Calculate selection percentages
895+
const totalSelections = Object.values(selectedOrgCounts).reduce((a, b) => a + b, 0);
896+
const selectionPercentages = Object.entries(selectedOrgCounts).reduce(
897+
(acc, [orgId, count]) => {
898+
acc[orgId] = (count / totalSelections) * 100;
899+
return acc;
900+
},
901+
{} as Record<string, number>
902+
);
903+
904+
console.log("Organization selection percentages:", selectionPercentages);
905+
906+
// Verify that org-2 (highest average age) gets selected in at least 40% of iterations
907+
expect(selectionPercentages["org-2"]).toBeGreaterThan(40);
908+
909+
// Verify that org-4 (lowest average age) gets selected in less than 20% of iterations
910+
expect(selectionPercentages["org-4"] || 0).toBeLessThan(20);
911+
}
912+
);
782913
});

0 commit comments

Comments
 (0)