From cc3afa25780c60d22192f71532134e2fc7203400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Tue, 24 Jun 2025 18:32:22 +0200 Subject: [PATCH] fix(concurrency-limit): scan instead of taking jobs (#1708) --- apps/api/src/lib/concurrency-limit.ts | 121 +++++++++++++------------- 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/apps/api/src/lib/concurrency-limit.ts b/apps/api/src/lib/concurrency-limit.ts index 8eb7813ea..c1916755d 100644 --- a/apps/api/src/lib/concurrency-limit.ts +++ b/apps/api/src/lib/concurrency-limit.ts @@ -69,24 +69,6 @@ export async function takeConcurrencyLimitedJob( return JSON.parse(res[1][0][0]); } -async function takeConcurrencyLimitedJobAndTimeout( - team_id: string, -): Promise<{ - job: ConcurrencyLimitedJob; - timeout: number; -} | null> { - await redisEvictConnection.zremrangebyscore(constructQueueKey(team_id), -Infinity, Date.now()); - const res = await redisEvictConnection.zmpop(1, constructQueueKey(team_id), "MIN"); - if (res === null || res === undefined) { - return null; - } - - return { - job: JSON.parse(res[1][0][0]), - timeout: res[1][0][1] === "inf" ? Infinity : parseFloat(res[1][0][1]), - }; -} - export async function pushConcurrencyLimitedJob( team_id: string, job: ConcurrencyLimitedJob, @@ -161,62 +143,79 @@ async function getNextConcurrentJob(teamId: string): Promise<{ job: ConcurrencyLimitedJob; timeout: number; } | null> { - let ignoredJobs: { - job: ConcurrencyLimitedJob; - timeout: number; - }[] = []; - let finalJob: { job: ConcurrencyLimitedJob; + _member: string; timeout: number; } | null = null; const crawlCache = new Map(); + let cursor: string = "0"; - while (finalJob === null) { - const res = await takeConcurrencyLimitedJobAndTimeout(teamId); - if (res === null) { + while (true) { + const scanResult = await redisEvictConnection.zscan(constructQueueKey(teamId), cursor, "COUNT", 1); + cursor = scanResult[0]; + const results = scanResult[1]; + + for (let i = 0; i < results.length; i += 2) { + const res = { + job: JSON.parse(results[i]), + _member: results[i], + timeout: results[i + 1] === "inf" ? Infinity : parseFloat(results[i + 1]), + }; + + // If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit + if (res.job.data.crawl_id) { + const sc = crawlCache.get(res.job.data.crawl_id) ?? await getCrawl(res.job.data.crawl_id); + if (sc !== null) { + crawlCache.set(res.job.data.crawl_id, sc); + } + + const maxCrawlConcurrency = sc === null + ? null + : (typeof sc.crawlerOptions?.delay === "number") + ? 1 + : sc.maxConcurrency ?? null; + + if (maxCrawlConcurrency !== null) { + // If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit + const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length; + if (currentActiveConcurrency < maxCrawlConcurrency) { + // If we're under the max concurrency limit, we can run the job + finalJob = res; + } + } else { + // If the crawl has no max concurrency limit, we can run the job + finalJob = res; + } + } else { + // If the job is not associated with a crawl ID, we can run the job + finalJob = res; + } + + if (finalJob !== null) { + break; + } + } + + if (finalJob !== null) { break; } - // If the job is associated with a crawl ID, we need to check if the crawl has a max concurrency limit - if (res.job.data.crawl_id) { - const sc = crawlCache.get(res.job.data.crawl_id) ?? await getCrawl(res.job.data.crawl_id); - if (sc !== null) { - crawlCache.set(res.job.data.crawl_id, sc); - } - - const maxCrawlConcurrency = sc === null - ? null - : (typeof sc.crawlerOptions?.delay === "number") - ? 1 - : sc.maxConcurrency ?? null; - - if (maxCrawlConcurrency !== null) { - // If the crawl has a max concurrency limit, we need to check if the crawl has reached the limit - const currentActiveConcurrency = (await getCrawlConcurrencyLimitActiveJobs(res.job.data.crawl_id)).length; - if (currentActiveConcurrency < maxCrawlConcurrency) { - // If we're under the max concurrency limit, we can run the job - finalJob = res; - } else { - // If we're at the max concurrency limit, we need to ignore the job - ignoredJobs.push({ - job: res.job, - timeout: res.timeout, - }); - } - } else { - // If the crawl has no max concurrency limit, we can run the job - finalJob = res; - } - } else { - // If the job is not associated with a crawl ID, we can run the job - finalJob = res; + if (cursor === "0") { + break; } } - for (const ignoredJob of ignoredJobs) { - await pushConcurrencyLimitedJob(teamId, ignoredJob.job, ignoredJob.timeout); + if (finalJob !== null) { + const res = await redisEvictConnection.zrem(constructQueueKey(teamId), finalJob._member); + if (res === 0) { + logger.warn("Failed to remove job from concurrency limit queue", { + teamId, + jobId: finalJob.job.id, + }); + return await getNextConcurrentJob(teamId); + } } return finalJob;