diff --git a/apps/api/src/controllers/v0/admin/cclog.ts b/apps/api/src/controllers/v0/admin/cclog.ts index a2303bb51..50b8e6916 100644 --- a/apps/api/src/controllers/v0/admin/cclog.ts +++ b/apps/api/src/controllers/v0/admin/cclog.ts @@ -3,9 +3,15 @@ import { db } from "../../../db/connection"; import * as schema from "../../../db/schema"; import { logger as _logger } from "../../../lib/logger"; import { Request, Response } from "express"; -import { scrapeQueueFdb } from "../../../services/worker/nuq-fdb"; +import { + nuqFdbHealthCheck, + scrapeQueueFdb, + withFdbTimeout, +} from "../../../services/worker/nuq-fdb"; import { fdbQueueEnabled } from "../../../services/worker/nuq-router"; +const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500; + async function cclog() { const logger = _logger.child({ module: "cclog", @@ -41,12 +47,17 @@ async function cclog() { if (fdbQueueEnabled()) { try { - const fdbCounts = await scrapeQueueFdb.getTeamActiveCounts(); - for (const [teamId, concurrency] of fdbCounts) { - concurrencyByTeam.set( - teamId, - (concurrencyByTeam.get(teamId) ?? 0) + concurrency, + if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) { + const fdbCounts = await withFdbTimeout( + scrapeQueueFdb.getTeamActiveCounts(), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, ); + for (const [teamId, concurrency] of fdbCounts) { + concurrencyByTeam.set( + teamId, + (concurrencyByTeam.get(teamId) ?? 0) + concurrency, + ); + } } } catch (e) { logger.warn("Error reading FDB concurrency", { error: e }); diff --git a/apps/api/src/controllers/v1/queue-status.ts b/apps/api/src/controllers/v1/queue-status.ts index e86f3202c..0f2b72316 100644 --- a/apps/api/src/controllers/v1/queue-status.ts +++ b/apps/api/src/controllers/v1/queue-status.ts @@ -4,7 +4,11 @@ import { AuthCreditUsageChunkFromTeam, RequestWithAuth } from "./types"; import { Response } from "express"; import { getRedisConnection } from "../../services/queue-service"; import { fdbQueueEnabled } from "../../services/worker/nuq-router"; -import { scrapeQueueFdb } from "../../services/worker/nuq-fdb"; +import { + nuqFdbHealthCheck, + scrapeQueueFdb, + withFdbTimeout, +} from "../../services/worker/nuq-fdb"; import { logger } from "../../lib/logger"; import { cleanOldConcurrencyLimitedJobs, @@ -22,6 +26,8 @@ type QueueStatusResponse = { mostRecentSuccess: string | null; }; +const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500; + export async function queueStatusController( req: RequestWithAuth<{}, undefined, QueueStatusResponse>, res: Response, @@ -53,12 +59,20 @@ export async function queueStatusController( // during the FDB migration a team can have load on both ledgers if (fdbQueueEnabled()) { try { - const [fdbActive, fdbPending] = await Promise.all([ - scrapeQueueFdb.getTeamActiveCount(req.auth.team_id), - scrapeQueueFdb.getTeamPendingCount(req.auth.team_id), - ]); - activeJobsOfTeam += fdbActive; - queuedJobsOfTeam += fdbPending; + if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) { + const [fdbActive, fdbPending] = await Promise.all([ + withFdbTimeout( + scrapeQueueFdb.getTeamActiveCount(req.auth.team_id), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, + ), + withFdbTimeout( + scrapeQueueFdb.getTeamPendingCount(req.auth.team_id), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, + ), + ]); + activeJobsOfTeam += fdbActive; + queuedJobsOfTeam += fdbPending; + } } catch (error) { logger.warn("Failed to read FDB queue counts, falling back to Redis", { module: "queue-status", diff --git a/apps/api/src/controllers/v2/queue-status.ts b/apps/api/src/controllers/v2/queue-status.ts index ef9d383ec..ea95e8343 100644 --- a/apps/api/src/controllers/v2/queue-status.ts +++ b/apps/api/src/controllers/v2/queue-status.ts @@ -5,7 +5,11 @@ import { AuthCreditUsageChunkFromTeam } from "../v1/types"; import { Response } from "express"; import { getRedisConnection } from "../../services/queue-service"; import { fdbQueueEnabled } from "../../services/worker/nuq-router"; -import { scrapeQueueFdb } from "../../services/worker/nuq-fdb"; +import { + nuqFdbHealthCheck, + scrapeQueueFdb, + withFdbTimeout, +} from "../../services/worker/nuq-fdb"; import { logger } from "../../lib/logger"; import { cleanOldConcurrencyLimitedJobs, @@ -23,6 +27,8 @@ type QueueStatusResponse = { mostRecentSuccess: string | null; }; +const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500; + export async function queueStatusController( req: RequestWithAuth<{}, undefined, QueueStatusResponse>, res: Response, @@ -54,12 +60,20 @@ export async function queueStatusController( // during the FDB migration a team can have load on both ledgers if (fdbQueueEnabled()) { try { - const [fdbActive, fdbPending] = await Promise.all([ - scrapeQueueFdb.getTeamActiveCount(req.auth.team_id), - scrapeQueueFdb.getTeamPendingCount(req.auth.team_id), - ]); - activeJobsOfTeam += fdbActive; - queuedJobsOfTeam += fdbPending; + if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) { + const [fdbActive, fdbPending] = await Promise.all([ + withFdbTimeout( + scrapeQueueFdb.getTeamActiveCount(req.auth.team_id), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, + ), + withFdbTimeout( + scrapeQueueFdb.getTeamPendingCount(req.auth.team_id), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, + ), + ]); + activeJobsOfTeam += fdbActive; + queuedJobsOfTeam += fdbPending; + } } catch (error) { logger.warn("Failed to read FDB queue counts, falling back to Redis", { module: "queue-status", diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 0e56ae531..60dcb0800 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -30,7 +30,11 @@ import { resolveJobBackend, scrapeQueue as routedScrapeQueue, } from "./worker/nuq-router"; -import { scrapeQueueFdb } from "./worker/nuq-fdb"; +import { + nuqFdbHealthCheck, + scrapeQueueFdb, + withFdbTimeout, +} from "./worker/nuq-fdb"; import { serializeTraceContext } from "../lib/otel-tracer"; import { isSelfHosted } from "../lib/deployment"; import { MONITOR_CHECK_STALE_TIMEOUT_MS } from "./monitoring/stale"; @@ -303,6 +307,8 @@ async function addScrapeJobFdb( } // parity with the PG path: notify when the backlog exceeds the team limit +const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500; + async function maybeSendConcurrencyNotificationFdb( teamId: string, teamLimit: number | null, @@ -310,7 +316,11 @@ async function maybeSendConcurrencyNotificationFdb( ) { if (teamLimit === null || crawlOrBatch) return; try { - const pending = await scrapeQueueFdb.getTeamPendingCount(teamId); + if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS))) return; + const pending = await withFdbTimeout( + scrapeQueueFdb.getTeamPendingCount(teamId), + FDB_OPTIONAL_COUNT_TIMEOUT_MS, + ); if (pending <= teamLimit) return; const shouldSendNotification = await shouldSendConcurrencyLimitNotification(teamId); diff --git a/apps/api/src/services/worker/nuq-router.ts b/apps/api/src/services/worker/nuq-router.ts index d7a1828d7..d736bb47d 100644 --- a/apps/api/src/services/worker/nuq-router.ts +++ b/apps/api/src/services/worker/nuq-router.ts @@ -55,7 +55,7 @@ function fdbForced(): boolean { } const fdbFallbackLastWarn = new Map(); -const FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS = 500; +const FDB_OPTIONAL_OP_TIMEOUT_MS = 500; function logFdbFallback( logger: Logger, @@ -75,10 +75,10 @@ function logFdbFallback( async function optionalFdb(operation: () => Promise): Promise { if (fdbForced()) return operation(); - if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS))) { - throw new Error("FDB health check failed before optional dequeue"); + if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_OP_TIMEOUT_MS))) { + throw new Error("FDB health check failed before optional operation"); } - return await withFdbTimeout(operation(), FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS); + return await withFdbTimeout(operation(), FDB_OPTIONAL_OP_TIMEOUT_MS); } // Whether NEW work for this team should go to FDB. Existing crawls follow @@ -152,10 +152,17 @@ export async function mirrorExternalSlotAcquire( ttlMs: number, ): Promise { if (await isFdbTeam(teamId)) { - await externalSlotsFdb.acquire(teamId, holderId, ttlMs); - } else { - await pushConcurrencyLimitActiveJob(teamId, holderId, ttlMs); + try { + await optionalFdb(() => + externalSlotsFdb.acquire(teamId, holderId, ttlMs), + ); + return; + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(_logger, "mirrorExternalSlotAcquire", error); + } } + await pushConcurrencyLimitActiveJob(teamId, holderId, ttlMs); } export async function mirrorExternalSlotRelease( @@ -163,10 +170,15 @@ export async function mirrorExternalSlotRelease( holderId: string, ): Promise { if (await isFdbTeam(teamId)) { - await externalSlotsFdb.release(teamId, holderId); - } else { - await removeConcurrencyLimitActiveJob(teamId, holderId); + try { + await optionalFdb(() => externalSlotsFdb.release(teamId, holderId)); + return; + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(_logger, "mirrorExternalSlotRelease", error); + } } + await removeConcurrencyLimitActiveJob(teamId, holderId); } // Active count across both ledgers; a migrating team has load on both while @@ -177,7 +189,10 @@ export async function getCombinedTeamActiveCount( const redisCount = await getConcurrencyLimitActiveJobsCount(teamId); if (!fdbQueueEnabled()) return redisCount; try { - return redisCount + (await scrapeQueueFdb.getTeamActiveCount(teamId)); + return ( + redisCount + + (await optionalFdb(() => scrapeQueueFdb.getTeamActiveCount(teamId))) + ); } catch (error) { if (fdbForced()) throw error; logFdbFallback(_logger, "getCombinedTeamActiveCount", error); @@ -218,23 +233,25 @@ export async function fdbEnqueueScrapeJobs( const queueCap = teamLimit === null ? Number.MAX_SAFE_INTEGER : getTeamQueueLimit(teamLimit); - const results = await scrapeQueueFdb.addJobs( - jobs.map(j => ({ - id: j.jobId, - data: j.data, - options: { - priority: j.priority, - listenable: j.listenable ?? false, - ownerId: j.data.team_id ?? undefined, - groupId: j.data.crawl_id ?? undefined, - bypassGate: - options?.bypassGate || - j.data.mode === "kickoff" || - j.data.mode === "kickoff_sitemap", - timesOutAt: new Date(Date.now() + j.backlogTimeoutMs), - }, - })), - { teamLimit, queueCap }, + const results = await optionalFdb(() => + scrapeQueueFdb.addJobs( + jobs.map(j => ({ + id: j.jobId, + data: j.data, + options: { + priority: j.priority, + listenable: j.listenable ?? false, + ownerId: j.data.team_id ?? undefined, + groupId: j.data.crawl_id ?? undefined, + bypassGate: + options?.bypassGate || + j.data.mode === "kickoff" || + j.data.mode === "kickoff_sitemap", + timesOutAt: new Date(Date.now() + j.backlogTimeoutMs), + }, + })), + { teamLimit, queueCap }, + ), ); const tagged = results.map(r => tagFdbJob(r as NuQJob)); @@ -283,7 +300,15 @@ class RoutedScrapeQueue { logger: Logger = _logger, ): Promise { if (this.backendFor(id) === "fdb") { - return scrapeQueueFdb.renewLock(id, lock, logger); + try { + return await optionalFdb(() => + scrapeQueueFdb.renewLock(id, lock, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "scrape.renewLock", error); + return false; + } } return scrapeQueuePg.renewLock(id, lock, logger); } @@ -297,7 +322,15 @@ class RoutedScrapeQueue { const backend = this.backendFor(id); this.inflightBackend.delete(id); if (backend === "fdb") { - return scrapeQueueFdb.jobFinish(id, lock, returnvalue, logger); + try { + return await optionalFdb(() => + scrapeQueueFdb.jobFinish(id, lock, returnvalue, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "scrape.jobFinish", error); + return false; + } } return scrapeQueuePg.jobFinish(id, lock, returnvalue, logger); } @@ -311,7 +344,15 @@ class RoutedScrapeQueue { const backend = this.backendFor(id); this.inflightBackend.delete(id); if (backend === "fdb") { - return scrapeQueueFdb.jobFail(id, lock, failedReason, logger); + try { + return await optionalFdb(() => + scrapeQueueFdb.jobFail(id, lock, failedReason, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "scrape.jobFail", error); + return false; + } } return scrapeQueuePg.jobFail(id, lock, failedReason, logger); } @@ -322,7 +363,7 @@ class RoutedScrapeQueue { ): Promise | null> { if (fdbQueueEnabled()) { try { - const job = await scrapeQueueFdb.getJob(id, logger); + const job = await optionalFdb(() => scrapeQueueFdb.getJob(id, logger)); if (job) return tagFdbJob(job as NuQJob); if (fdbForced()) return null; } catch (error) { @@ -340,7 +381,7 @@ class RoutedScrapeQueue { if (!fdbQueueEnabled()) return scrapeQueuePg.getJobs(ids, logger); let fdbJobs: NuQFdbJob[] = []; try { - fdbJobs = await scrapeQueueFdb.getJobs(ids, logger); + fdbJobs = await optionalFdb(() => scrapeQueueFdb.getJobs(ids, logger)); } catch (error) { if (fdbForced()) throw error; logFdbFallback(logger, "scrape.getJobs", error); @@ -381,7 +422,9 @@ class RoutedScrapeQueue { private async isFdbGroup(groupId: string): Promise { if (!fdbQueueEnabled()) return false; try { - return (await crawlGroupFdb.getGroup(groupId)) !== null; + return ( + (await optionalFdb(() => crawlGroupFdb.getGroup(groupId))) !== null + ); } catch (error) { if (fdbForced()) throw error; logFdbFallback(_logger, "scrape.isFdbGroup", error); @@ -395,7 +438,9 @@ class RoutedScrapeQueue { logger: Logger = _logger, ): Promise | null> { if (await this.isFdbGroup(groupId)) { - const job = await scrapeQueueFdb.getGroupAnyJob(groupId, ownerId, logger); + const job = await optionalFdb(() => + scrapeQueueFdb.getGroupAnyJob(groupId, ownerId, logger), + ); return job ? tagFdbJob(job as NuQJob) : null; } return scrapeQueuePg.getGroupAnyJob(groupId, ownerId); @@ -406,9 +451,9 @@ class RoutedScrapeQueue { logger: Logger = _logger, ): Promise> { if (await this.isFdbGroup(groupId)) { - return scrapeQueueFdb.getGroupNumericStats(groupId, logger) as Promise< - Record - >; + return (await optionalFdb(() => + scrapeQueueFdb.getGroupNumericStats(groupId, logger), + )) as Record; } return scrapeQueuePg.getGroupNumericStats(groupId, logger); } @@ -420,11 +465,8 @@ class RoutedScrapeQueue { logger: Logger = _logger, ): Promise[]> { if (await this.isFdbGroup(groupId)) { - const jobs = await scrapeQueueFdb.getCrawlJobsForListing( - groupId, - limit, - offset, - logger, + const jobs = await optionalFdb(() => + scrapeQueueFdb.getCrawlJobsForListing(groupId, limit, offset, logger), ); return jobs.map(j => tagFdbJob(j as NuQJob)); } @@ -434,8 +476,8 @@ class RoutedScrapeQueue { public async removeJob(id: string, logger: Logger = _logger): Promise { if (fdbQueueEnabled()) { try { - if (await scrapeQueueFdb.hasJob(id)) { - await scrapeQueueFdb.removeJob(id, logger); + if (await optionalFdb(() => scrapeQueueFdb.hasJob(id))) { + await optionalFdb(() => scrapeQueueFdb.removeJob(id, logger)); return; } } catch (error) { @@ -463,8 +505,10 @@ class RoutedScrapeQueue { ): Promise { if (fdbQueueEnabled()) { try { - if (await scrapeQueueFdb.hasJob(id)) { - return scrapeQueueFdb.waitForJob(id, timeout, logger); + if (await optionalFdb(() => scrapeQueueFdb.hasJob(id))) { + return optionalFdb(() => + scrapeQueueFdb.waitForJob(id, timeout, logger), + ); } } catch (error) { if (fdbForced()) throw error; @@ -513,7 +557,15 @@ class RoutedCrawlFinishedQueue { logger: Logger = _logger, ): Promise { if (this.inflightBackend.get(id) === "fdb") { - return crawlFinishedQueueFdb.renewLock(id, lock, logger); + try { + return await optionalFdb(() => + crawlFinishedQueueFdb.renewLock(id, lock, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "crawlFinished.renewLock", error); + return false; + } } return crawlFinishedQueuePg.renewLock(id, lock, logger); } @@ -527,7 +579,15 @@ class RoutedCrawlFinishedQueue { const backend = this.inflightBackend.get(id) ?? "pg"; this.inflightBackend.delete(id); if (backend === "fdb") { - return crawlFinishedQueueFdb.jobFinish(id, lock, returnvalue, logger); + try { + return await optionalFdb(() => + crawlFinishedQueueFdb.jobFinish(id, lock, returnvalue, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "crawlFinished.jobFinish", error); + return false; + } } return crawlFinishedQueuePg.jobFinish(id, lock, returnvalue, logger); } @@ -541,7 +601,15 @@ class RoutedCrawlFinishedQueue { const backend = this.inflightBackend.get(id) ?? "pg"; this.inflightBackend.delete(id); if (backend === "fdb") { - return crawlFinishedQueueFdb.jobFail(id, lock, failedReason, logger); + try { + return await optionalFdb(() => + crawlFinishedQueueFdb.jobFail(id, lock, failedReason, logger), + ); + } catch (error) { + if (fdbForced()) throw error; + logFdbFallback(logger, "crawlFinished.jobFail", error); + return false; + } } return crawlFinishedQueuePg.jobFail(id, lock, failedReason, logger); } @@ -552,7 +620,9 @@ class RoutedCrawlFinishedQueue { ): Promise | null> { if (fdbQueueEnabled()) { try { - const job = await crawlFinishedQueueFdb.getJob(id, logger); + const job = await optionalFdb(() => + crawlFinishedQueueFdb.getJob(id, logger), + ); if (job) return tagFdbJob(job as NuQJob); if (fdbForced()) return null; } catch (error) { @@ -579,15 +649,17 @@ class RoutedCrawlGroup { logger: Logger = _logger, ): Promise { if (opts?.backend === "fdb") { - const g = await crawlGroupFdb.addGroup( - id, - ownerId, - ttl, - { - maxConcurrency: opts.maxConcurrency, - delaySeconds: opts.delaySeconds, - }, - logger, + const g = await optionalFdb(() => + crawlGroupFdb.addGroup( + id, + ownerId, + ttl, + { + maxConcurrency: opts.maxConcurrency, + delaySeconds: opts.delaySeconds, + }, + logger, + ), ); return g as NuQJobGroupInstance; } @@ -600,7 +672,7 @@ class RoutedCrawlGroup { ): Promise { if (fdbQueueEnabled()) { try { - const g = await crawlGroupFdb.getGroup(id, logger); + const g = await optionalFdb(() => crawlGroupFdb.getGroup(id, logger)); if (g) return g as NuQJobGroupInstance; if (fdbForced()) return null; } catch (error) { @@ -620,9 +692,8 @@ class RoutedCrawlGroup { } let fdb: NuQJobGroupInstance[] = []; try { - fdb = (await crawlGroupFdb.getOngoingByOwner( - ownerId, - logger, + fdb = (await optionalFdb(() => + crawlGroupFdb.getOngoingByOwner(ownerId, logger), )) as NuQJobGroupInstance[]; } catch (error) { if (fdbForced()) throw error; @@ -646,7 +717,7 @@ class RoutedCrawlGroup { ): Promise { if (!fdbQueueEnabled()) return false; try { - return crawlGroupFdb.cancelGroup(id, logger); + return await optionalFdb(() => crawlGroupFdb.cancelGroup(id, logger)); } catch (error) { if (fdbForced()) throw error; logFdbFallback(logger, "crawlGroup.cancelGroup", error); diff --git a/apps/api/src/services/worker/team-semaphore.ts b/apps/api/src/services/worker/team-semaphore.ts index 7b3c9954f..b9ebbd9a5 100644 --- a/apps/api/src/services/worker/team-semaphore.ts +++ b/apps/api/src/services/worker/team-semaphore.ts @@ -2,8 +2,9 @@ import { pushConcurrencyLimitActiveJob, removeConcurrencyLimitActiveJob, } from "../../lib/concurrency-limit"; +import { config } from "../../config"; import { isFdbTeam } from "./nuq-router"; -import { externalSlotsFdb } from "./nuq-fdb"; +import { externalSlotsFdb, nuqFdbHealthCheck, withFdbTimeout } from "./nuq-fdb"; import { isSelfHosted } from "../../lib/deployment"; import { ScrapeJobTimeoutError, TransportableError } from "../../lib/error"; import { logger as _logger } from "../../lib/logger"; @@ -32,9 +33,22 @@ const semaphoreHoldDuration = new Histogram({ const { scripts, runScript, ensure } = nuqRedis; const SEMAPHORE_TTL = 30 * 1000; +const FDB_OPTIONAL_SLOT_TIMEOUT_MS = 500; type MirrorBackend = "pg" | "fdb"; type MirrorState = { backend?: MirrorBackend; touched: Set }; +function fdbForced(): boolean { + return config.NUQ_BACKEND === "fdb"; +} + +async function optionalFdbSlot(operation: () => Promise): Promise { + if (fdbForced()) return operation(); + if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_SLOT_TIMEOUT_MS))) { + throw new Error("FDB health check failed before optional slot operation"); + } + return await withFdbTimeout(operation(), FDB_OPTIONAL_SLOT_TIMEOUT_MS); +} + async function acquire( teamId: string, holderId: string, @@ -206,7 +220,9 @@ function startHeartbeat( // PG-backed teams mirror into the Redis ZSET; FDB-backed teams consume an // external slot on the FDB ledger. async function resolveMirrorBackend(teamId: string): Promise { - return (await isFdbTeam(teamId)) ? "fdb" : "pg"; + if (!(await isFdbTeam(teamId))) return "pg"; + if (fdbForced()) return "fdb"; + return (await nuqFdbHealthCheck(FDB_OPTIONAL_SLOT_TIMEOUT_MS)) ? "fdb" : "pg"; } async function releaseMirrorBackend( @@ -215,7 +231,7 @@ async function releaseMirrorBackend( backend: MirrorBackend, ): Promise { if (backend === "fdb") { - await externalSlotsFdb.release(teamId, holderId); + await optionalFdbSlot(() => externalSlotsFdb.release(teamId, holderId)); } else { await removeConcurrencyLimitActiveJob(teamId, holderId); } @@ -228,7 +244,9 @@ async function mirrorSlotAcquire( ): Promise { const backend = await resolveMirrorBackend(teamId); if (backend === "fdb") { - await externalSlotsFdb.acquire(teamId, holderId, 60 * 1000); + await optionalFdbSlot(() => + externalSlotsFdb.acquire(teamId, holderId, 60 * 1000), + ); } else { await pushConcurrencyLimitActiveJob(teamId, holderId, 60 * 1000); }