mirror of
https://github.com/firecrawl/firecrawl.git
synced 2026-06-16 02:50:28 +03:00
fix(api): prevent optional fdb probes from hanging
This commit is contained in:
@@ -3,9 +3,15 @@ import { db } from "../../../db/connection";
|
||||
import * as schema from "../../../db/schema";
|
||||
import { logger as _logger } from "../../../lib/logger";
|
||||
import { Request, Response } from "express";
|
||||
import { scrapeQueueFdb } from "../../../services/worker/nuq-fdb";
|
||||
import {
|
||||
nuqFdbHealthCheck,
|
||||
scrapeQueueFdb,
|
||||
withFdbTimeout,
|
||||
} from "../../../services/worker/nuq-fdb";
|
||||
import { fdbQueueEnabled } from "../../../services/worker/nuq-router";
|
||||
|
||||
const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500;
|
||||
|
||||
async function cclog() {
|
||||
const logger = _logger.child({
|
||||
module: "cclog",
|
||||
@@ -41,12 +47,17 @@ async function cclog() {
|
||||
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const fdbCounts = await scrapeQueueFdb.getTeamActiveCounts();
|
||||
for (const [teamId, concurrency] of fdbCounts) {
|
||||
concurrencyByTeam.set(
|
||||
teamId,
|
||||
(concurrencyByTeam.get(teamId) ?? 0) + concurrency,
|
||||
if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) {
|
||||
const fdbCounts = await withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamActiveCounts(),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
);
|
||||
for (const [teamId, concurrency] of fdbCounts) {
|
||||
concurrencyByTeam.set(
|
||||
teamId,
|
||||
(concurrencyByTeam.get(teamId) ?? 0) + concurrency,
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.warn("Error reading FDB concurrency", { error: e });
|
||||
|
||||
@@ -4,7 +4,11 @@ import { AuthCreditUsageChunkFromTeam, RequestWithAuth } from "./types";
|
||||
import { Response } from "express";
|
||||
import { getRedisConnection } from "../../services/queue-service";
|
||||
import { fdbQueueEnabled } from "../../services/worker/nuq-router";
|
||||
import { scrapeQueueFdb } from "../../services/worker/nuq-fdb";
|
||||
import {
|
||||
nuqFdbHealthCheck,
|
||||
scrapeQueueFdb,
|
||||
withFdbTimeout,
|
||||
} from "../../services/worker/nuq-fdb";
|
||||
import { logger } from "../../lib/logger";
|
||||
import {
|
||||
cleanOldConcurrencyLimitedJobs,
|
||||
@@ -22,6 +26,8 @@ type QueueStatusResponse = {
|
||||
mostRecentSuccess: string | null;
|
||||
};
|
||||
|
||||
const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500;
|
||||
|
||||
export async function queueStatusController(
|
||||
req: RequestWithAuth<{}, undefined, QueueStatusResponse>,
|
||||
res: Response<QueueStatusResponse>,
|
||||
@@ -53,12 +59,20 @@ export async function queueStatusController(
|
||||
// during the FDB migration a team can have load on both ledgers
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const [fdbActive, fdbPending] = await Promise.all([
|
||||
scrapeQueueFdb.getTeamActiveCount(req.auth.team_id),
|
||||
scrapeQueueFdb.getTeamPendingCount(req.auth.team_id),
|
||||
]);
|
||||
activeJobsOfTeam += fdbActive;
|
||||
queuedJobsOfTeam += fdbPending;
|
||||
if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) {
|
||||
const [fdbActive, fdbPending] = await Promise.all([
|
||||
withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamActiveCount(req.auth.team_id),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
),
|
||||
withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamPendingCount(req.auth.team_id),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
),
|
||||
]);
|
||||
activeJobsOfTeam += fdbActive;
|
||||
queuedJobsOfTeam += fdbPending;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn("Failed to read FDB queue counts, falling back to Redis", {
|
||||
module: "queue-status",
|
||||
|
||||
@@ -5,7 +5,11 @@ import { AuthCreditUsageChunkFromTeam } from "../v1/types";
|
||||
import { Response } from "express";
|
||||
import { getRedisConnection } from "../../services/queue-service";
|
||||
import { fdbQueueEnabled } from "../../services/worker/nuq-router";
|
||||
import { scrapeQueueFdb } from "../../services/worker/nuq-fdb";
|
||||
import {
|
||||
nuqFdbHealthCheck,
|
||||
scrapeQueueFdb,
|
||||
withFdbTimeout,
|
||||
} from "../../services/worker/nuq-fdb";
|
||||
import { logger } from "../../lib/logger";
|
||||
import {
|
||||
cleanOldConcurrencyLimitedJobs,
|
||||
@@ -23,6 +27,8 @@ type QueueStatusResponse = {
|
||||
mostRecentSuccess: string | null;
|
||||
};
|
||||
|
||||
const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500;
|
||||
|
||||
export async function queueStatusController(
|
||||
req: RequestWithAuth<{}, undefined, QueueStatusResponse>,
|
||||
res: Response<QueueStatusResponse>,
|
||||
@@ -54,12 +60,20 @@ export async function queueStatusController(
|
||||
// during the FDB migration a team can have load on both ledgers
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const [fdbActive, fdbPending] = await Promise.all([
|
||||
scrapeQueueFdb.getTeamActiveCount(req.auth.team_id),
|
||||
scrapeQueueFdb.getTeamPendingCount(req.auth.team_id),
|
||||
]);
|
||||
activeJobsOfTeam += fdbActive;
|
||||
queuedJobsOfTeam += fdbPending;
|
||||
if (await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS)) {
|
||||
const [fdbActive, fdbPending] = await Promise.all([
|
||||
withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamActiveCount(req.auth.team_id),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
),
|
||||
withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamPendingCount(req.auth.team_id),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
),
|
||||
]);
|
||||
activeJobsOfTeam += fdbActive;
|
||||
queuedJobsOfTeam += fdbPending;
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn("Failed to read FDB queue counts, falling back to Redis", {
|
||||
module: "queue-status",
|
||||
|
||||
@@ -30,7 +30,11 @@ import {
|
||||
resolveJobBackend,
|
||||
scrapeQueue as routedScrapeQueue,
|
||||
} from "./worker/nuq-router";
|
||||
import { scrapeQueueFdb } from "./worker/nuq-fdb";
|
||||
import {
|
||||
nuqFdbHealthCheck,
|
||||
scrapeQueueFdb,
|
||||
withFdbTimeout,
|
||||
} from "./worker/nuq-fdb";
|
||||
import { serializeTraceContext } from "../lib/otel-tracer";
|
||||
import { isSelfHosted } from "../lib/deployment";
|
||||
import { MONITOR_CHECK_STALE_TIMEOUT_MS } from "./monitoring/stale";
|
||||
@@ -303,6 +307,8 @@ async function addScrapeJobFdb(
|
||||
}
|
||||
|
||||
// parity with the PG path: notify when the backlog exceeds the team limit
|
||||
const FDB_OPTIONAL_COUNT_TIMEOUT_MS = 500;
|
||||
|
||||
async function maybeSendConcurrencyNotificationFdb(
|
||||
teamId: string,
|
||||
teamLimit: number | null,
|
||||
@@ -310,7 +316,11 @@ async function maybeSendConcurrencyNotificationFdb(
|
||||
) {
|
||||
if (teamLimit === null || crawlOrBatch) return;
|
||||
try {
|
||||
const pending = await scrapeQueueFdb.getTeamPendingCount(teamId);
|
||||
if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_COUNT_TIMEOUT_MS))) return;
|
||||
const pending = await withFdbTimeout(
|
||||
scrapeQueueFdb.getTeamPendingCount(teamId),
|
||||
FDB_OPTIONAL_COUNT_TIMEOUT_MS,
|
||||
);
|
||||
if (pending <= teamLimit) return;
|
||||
const shouldSendNotification =
|
||||
await shouldSendConcurrencyLimitNotification(teamId);
|
||||
|
||||
@@ -55,7 +55,7 @@ function fdbForced(): boolean {
|
||||
}
|
||||
|
||||
const fdbFallbackLastWarn = new Map<string, number>();
|
||||
const FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS = 500;
|
||||
const FDB_OPTIONAL_OP_TIMEOUT_MS = 500;
|
||||
|
||||
function logFdbFallback(
|
||||
logger: Logger,
|
||||
@@ -75,10 +75,10 @@ function logFdbFallback(
|
||||
|
||||
async function optionalFdb<T>(operation: () => Promise<T>): Promise<T> {
|
||||
if (fdbForced()) return operation();
|
||||
if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS))) {
|
||||
throw new Error("FDB health check failed before optional dequeue");
|
||||
if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_OP_TIMEOUT_MS))) {
|
||||
throw new Error("FDB health check failed before optional operation");
|
||||
}
|
||||
return await withFdbTimeout(operation(), FDB_OPTIONAL_DEQUEUE_TIMEOUT_MS);
|
||||
return await withFdbTimeout(operation(), FDB_OPTIONAL_OP_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
// Whether NEW work for this team should go to FDB. Existing crawls follow
|
||||
@@ -152,10 +152,17 @@ export async function mirrorExternalSlotAcquire(
|
||||
ttlMs: number,
|
||||
): Promise<void> {
|
||||
if (await isFdbTeam(teamId)) {
|
||||
await externalSlotsFdb.acquire(teamId, holderId, ttlMs);
|
||||
} else {
|
||||
await pushConcurrencyLimitActiveJob(teamId, holderId, ttlMs);
|
||||
try {
|
||||
await optionalFdb(() =>
|
||||
externalSlotsFdb.acquire(teamId, holderId, ttlMs),
|
||||
);
|
||||
return;
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(_logger, "mirrorExternalSlotAcquire", error);
|
||||
}
|
||||
}
|
||||
await pushConcurrencyLimitActiveJob(teamId, holderId, ttlMs);
|
||||
}
|
||||
|
||||
export async function mirrorExternalSlotRelease(
|
||||
@@ -163,10 +170,15 @@ export async function mirrorExternalSlotRelease(
|
||||
holderId: string,
|
||||
): Promise<void> {
|
||||
if (await isFdbTeam(teamId)) {
|
||||
await externalSlotsFdb.release(teamId, holderId);
|
||||
} else {
|
||||
await removeConcurrencyLimitActiveJob(teamId, holderId);
|
||||
try {
|
||||
await optionalFdb(() => externalSlotsFdb.release(teamId, holderId));
|
||||
return;
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(_logger, "mirrorExternalSlotRelease", error);
|
||||
}
|
||||
}
|
||||
await removeConcurrencyLimitActiveJob(teamId, holderId);
|
||||
}
|
||||
|
||||
// Active count across both ledgers; a migrating team has load on both while
|
||||
@@ -177,7 +189,10 @@ export async function getCombinedTeamActiveCount(
|
||||
const redisCount = await getConcurrencyLimitActiveJobsCount(teamId);
|
||||
if (!fdbQueueEnabled()) return redisCount;
|
||||
try {
|
||||
return redisCount + (await scrapeQueueFdb.getTeamActiveCount(teamId));
|
||||
return (
|
||||
redisCount +
|
||||
(await optionalFdb(() => scrapeQueueFdb.getTeamActiveCount(teamId)))
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(_logger, "getCombinedTeamActiveCount", error);
|
||||
@@ -218,23 +233,25 @@ export async function fdbEnqueueScrapeJobs(
|
||||
const queueCap =
|
||||
teamLimit === null ? Number.MAX_SAFE_INTEGER : getTeamQueueLimit(teamLimit);
|
||||
|
||||
const results = await scrapeQueueFdb.addJobs(
|
||||
jobs.map(j => ({
|
||||
id: j.jobId,
|
||||
data: j.data,
|
||||
options: {
|
||||
priority: j.priority,
|
||||
listenable: j.listenable ?? false,
|
||||
ownerId: j.data.team_id ?? undefined,
|
||||
groupId: j.data.crawl_id ?? undefined,
|
||||
bypassGate:
|
||||
options?.bypassGate ||
|
||||
j.data.mode === "kickoff" ||
|
||||
j.data.mode === "kickoff_sitemap",
|
||||
timesOutAt: new Date(Date.now() + j.backlogTimeoutMs),
|
||||
},
|
||||
})),
|
||||
{ teamLimit, queueCap },
|
||||
const results = await optionalFdb(() =>
|
||||
scrapeQueueFdb.addJobs(
|
||||
jobs.map(j => ({
|
||||
id: j.jobId,
|
||||
data: j.data,
|
||||
options: {
|
||||
priority: j.priority,
|
||||
listenable: j.listenable ?? false,
|
||||
ownerId: j.data.team_id ?? undefined,
|
||||
groupId: j.data.crawl_id ?? undefined,
|
||||
bypassGate:
|
||||
options?.bypassGate ||
|
||||
j.data.mode === "kickoff" ||
|
||||
j.data.mode === "kickoff_sitemap",
|
||||
timesOutAt: new Date(Date.now() + j.backlogTimeoutMs),
|
||||
},
|
||||
})),
|
||||
{ teamLimit, queueCap },
|
||||
),
|
||||
);
|
||||
|
||||
const tagged = results.map(r => tagFdbJob(r as NuQJob<ScrapeJobData>));
|
||||
@@ -283,7 +300,15 @@ class RoutedScrapeQueue {
|
||||
logger: Logger = _logger,
|
||||
): Promise<boolean> {
|
||||
if (this.backendFor(id) === "fdb") {
|
||||
return scrapeQueueFdb.renewLock(id, lock, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
scrapeQueueFdb.renewLock(id, lock, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "scrape.renewLock", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return scrapeQueuePg.renewLock(id, lock, logger);
|
||||
}
|
||||
@@ -297,7 +322,15 @@ class RoutedScrapeQueue {
|
||||
const backend = this.backendFor(id);
|
||||
this.inflightBackend.delete(id);
|
||||
if (backend === "fdb") {
|
||||
return scrapeQueueFdb.jobFinish(id, lock, returnvalue, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
scrapeQueueFdb.jobFinish(id, lock, returnvalue, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "scrape.jobFinish", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return scrapeQueuePg.jobFinish(id, lock, returnvalue, logger);
|
||||
}
|
||||
@@ -311,7 +344,15 @@ class RoutedScrapeQueue {
|
||||
const backend = this.backendFor(id);
|
||||
this.inflightBackend.delete(id);
|
||||
if (backend === "fdb") {
|
||||
return scrapeQueueFdb.jobFail(id, lock, failedReason, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
scrapeQueueFdb.jobFail(id, lock, failedReason, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "scrape.jobFail", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return scrapeQueuePg.jobFail(id, lock, failedReason, logger);
|
||||
}
|
||||
@@ -322,7 +363,7 @@ class RoutedScrapeQueue {
|
||||
): Promise<NuQJob<ScrapeJobData> | null> {
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const job = await scrapeQueueFdb.getJob(id, logger);
|
||||
const job = await optionalFdb(() => scrapeQueueFdb.getJob(id, logger));
|
||||
if (job) return tagFdbJob(job as NuQJob<ScrapeJobData>);
|
||||
if (fdbForced()) return null;
|
||||
} catch (error) {
|
||||
@@ -340,7 +381,7 @@ class RoutedScrapeQueue {
|
||||
if (!fdbQueueEnabled()) return scrapeQueuePg.getJobs(ids, logger);
|
||||
let fdbJobs: NuQFdbJob<ScrapeJobData>[] = [];
|
||||
try {
|
||||
fdbJobs = await scrapeQueueFdb.getJobs(ids, logger);
|
||||
fdbJobs = await optionalFdb(() => scrapeQueueFdb.getJobs(ids, logger));
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "scrape.getJobs", error);
|
||||
@@ -381,7 +422,9 @@ class RoutedScrapeQueue {
|
||||
private async isFdbGroup(groupId: string): Promise<boolean> {
|
||||
if (!fdbQueueEnabled()) return false;
|
||||
try {
|
||||
return (await crawlGroupFdb.getGroup(groupId)) !== null;
|
||||
return (
|
||||
(await optionalFdb(() => crawlGroupFdb.getGroup(groupId))) !== null
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(_logger, "scrape.isFdbGroup", error);
|
||||
@@ -395,7 +438,9 @@ class RoutedScrapeQueue {
|
||||
logger: Logger = _logger,
|
||||
): Promise<NuQJob<ScrapeJobData> | null> {
|
||||
if (await this.isFdbGroup(groupId)) {
|
||||
const job = await scrapeQueueFdb.getGroupAnyJob(groupId, ownerId, logger);
|
||||
const job = await optionalFdb(() =>
|
||||
scrapeQueueFdb.getGroupAnyJob(groupId, ownerId, logger),
|
||||
);
|
||||
return job ? tagFdbJob(job as NuQJob<ScrapeJobData>) : null;
|
||||
}
|
||||
return scrapeQueuePg.getGroupAnyJob(groupId, ownerId);
|
||||
@@ -406,9 +451,9 @@ class RoutedScrapeQueue {
|
||||
logger: Logger = _logger,
|
||||
): Promise<Record<NuQJobStatus, number>> {
|
||||
if (await this.isFdbGroup(groupId)) {
|
||||
return scrapeQueueFdb.getGroupNumericStats(groupId, logger) as Promise<
|
||||
Record<NuQJobStatus, number>
|
||||
>;
|
||||
return (await optionalFdb(() =>
|
||||
scrapeQueueFdb.getGroupNumericStats(groupId, logger),
|
||||
)) as Record<NuQJobStatus, number>;
|
||||
}
|
||||
return scrapeQueuePg.getGroupNumericStats(groupId, logger);
|
||||
}
|
||||
@@ -420,11 +465,8 @@ class RoutedScrapeQueue {
|
||||
logger: Logger = _logger,
|
||||
): Promise<NuQJob<ScrapeJobData>[]> {
|
||||
if (await this.isFdbGroup(groupId)) {
|
||||
const jobs = await scrapeQueueFdb.getCrawlJobsForListing(
|
||||
groupId,
|
||||
limit,
|
||||
offset,
|
||||
logger,
|
||||
const jobs = await optionalFdb(() =>
|
||||
scrapeQueueFdb.getCrawlJobsForListing(groupId, limit, offset, logger),
|
||||
);
|
||||
return jobs.map(j => tagFdbJob(j as NuQJob<ScrapeJobData>));
|
||||
}
|
||||
@@ -434,8 +476,8 @@ class RoutedScrapeQueue {
|
||||
public async removeJob(id: string, logger: Logger = _logger): Promise<void> {
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
if (await scrapeQueueFdb.hasJob(id)) {
|
||||
await scrapeQueueFdb.removeJob(id, logger);
|
||||
if (await optionalFdb(() => scrapeQueueFdb.hasJob(id))) {
|
||||
await optionalFdb(() => scrapeQueueFdb.removeJob(id, logger));
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -463,8 +505,10 @@ class RoutedScrapeQueue {
|
||||
): Promise<T> {
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
if (await scrapeQueueFdb.hasJob(id)) {
|
||||
return scrapeQueueFdb.waitForJob(id, timeout, logger);
|
||||
if (await optionalFdb(() => scrapeQueueFdb.hasJob(id))) {
|
||||
return optionalFdb(() =>
|
||||
scrapeQueueFdb.waitForJob(id, timeout, logger),
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
@@ -513,7 +557,15 @@ class RoutedCrawlFinishedQueue {
|
||||
logger: Logger = _logger,
|
||||
): Promise<boolean> {
|
||||
if (this.inflightBackend.get(id) === "fdb") {
|
||||
return crawlFinishedQueueFdb.renewLock(id, lock, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
crawlFinishedQueueFdb.renewLock(id, lock, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "crawlFinished.renewLock", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return crawlFinishedQueuePg.renewLock(id, lock, logger);
|
||||
}
|
||||
@@ -527,7 +579,15 @@ class RoutedCrawlFinishedQueue {
|
||||
const backend = this.inflightBackend.get(id) ?? "pg";
|
||||
this.inflightBackend.delete(id);
|
||||
if (backend === "fdb") {
|
||||
return crawlFinishedQueueFdb.jobFinish(id, lock, returnvalue, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
crawlFinishedQueueFdb.jobFinish(id, lock, returnvalue, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "crawlFinished.jobFinish", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return crawlFinishedQueuePg.jobFinish(id, lock, returnvalue, logger);
|
||||
}
|
||||
@@ -541,7 +601,15 @@ class RoutedCrawlFinishedQueue {
|
||||
const backend = this.inflightBackend.get(id) ?? "pg";
|
||||
this.inflightBackend.delete(id);
|
||||
if (backend === "fdb") {
|
||||
return crawlFinishedQueueFdb.jobFail(id, lock, failedReason, logger);
|
||||
try {
|
||||
return await optionalFdb(() =>
|
||||
crawlFinishedQueueFdb.jobFail(id, lock, failedReason, logger),
|
||||
);
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "crawlFinished.jobFail", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return crawlFinishedQueuePg.jobFail(id, lock, failedReason, logger);
|
||||
}
|
||||
@@ -552,7 +620,9 @@ class RoutedCrawlFinishedQueue {
|
||||
): Promise<NuQJob<any> | null> {
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const job = await crawlFinishedQueueFdb.getJob(id, logger);
|
||||
const job = await optionalFdb(() =>
|
||||
crawlFinishedQueueFdb.getJob(id, logger),
|
||||
);
|
||||
if (job) return tagFdbJob(job as NuQJob<any>);
|
||||
if (fdbForced()) return null;
|
||||
} catch (error) {
|
||||
@@ -579,15 +649,17 @@ class RoutedCrawlGroup {
|
||||
logger: Logger = _logger,
|
||||
): Promise<NuQJobGroupInstance> {
|
||||
if (opts?.backend === "fdb") {
|
||||
const g = await crawlGroupFdb.addGroup(
|
||||
id,
|
||||
ownerId,
|
||||
ttl,
|
||||
{
|
||||
maxConcurrency: opts.maxConcurrency,
|
||||
delaySeconds: opts.delaySeconds,
|
||||
},
|
||||
logger,
|
||||
const g = await optionalFdb(() =>
|
||||
crawlGroupFdb.addGroup(
|
||||
id,
|
||||
ownerId,
|
||||
ttl,
|
||||
{
|
||||
maxConcurrency: opts.maxConcurrency,
|
||||
delaySeconds: opts.delaySeconds,
|
||||
},
|
||||
logger,
|
||||
),
|
||||
);
|
||||
return g as NuQJobGroupInstance;
|
||||
}
|
||||
@@ -600,7 +672,7 @@ class RoutedCrawlGroup {
|
||||
): Promise<NuQJobGroupInstance | null> {
|
||||
if (fdbQueueEnabled()) {
|
||||
try {
|
||||
const g = await crawlGroupFdb.getGroup(id, logger);
|
||||
const g = await optionalFdb(() => crawlGroupFdb.getGroup(id, logger));
|
||||
if (g) return g as NuQJobGroupInstance;
|
||||
if (fdbForced()) return null;
|
||||
} catch (error) {
|
||||
@@ -620,9 +692,8 @@ class RoutedCrawlGroup {
|
||||
}
|
||||
let fdb: NuQJobGroupInstance[] = [];
|
||||
try {
|
||||
fdb = (await crawlGroupFdb.getOngoingByOwner(
|
||||
ownerId,
|
||||
logger,
|
||||
fdb = (await optionalFdb(() =>
|
||||
crawlGroupFdb.getOngoingByOwner(ownerId, logger),
|
||||
)) as NuQJobGroupInstance[];
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
@@ -646,7 +717,7 @@ class RoutedCrawlGroup {
|
||||
): Promise<boolean> {
|
||||
if (!fdbQueueEnabled()) return false;
|
||||
try {
|
||||
return crawlGroupFdb.cancelGroup(id, logger);
|
||||
return await optionalFdb(() => crawlGroupFdb.cancelGroup(id, logger));
|
||||
} catch (error) {
|
||||
if (fdbForced()) throw error;
|
||||
logFdbFallback(logger, "crawlGroup.cancelGroup", error);
|
||||
|
||||
@@ -2,8 +2,9 @@ import {
|
||||
pushConcurrencyLimitActiveJob,
|
||||
removeConcurrencyLimitActiveJob,
|
||||
} from "../../lib/concurrency-limit";
|
||||
import { config } from "../../config";
|
||||
import { isFdbTeam } from "./nuq-router";
|
||||
import { externalSlotsFdb } from "./nuq-fdb";
|
||||
import { externalSlotsFdb, nuqFdbHealthCheck, withFdbTimeout } from "./nuq-fdb";
|
||||
import { isSelfHosted } from "../../lib/deployment";
|
||||
import { ScrapeJobTimeoutError, TransportableError } from "../../lib/error";
|
||||
import { logger as _logger } from "../../lib/logger";
|
||||
@@ -32,9 +33,22 @@ const semaphoreHoldDuration = new Histogram({
|
||||
const { scripts, runScript, ensure } = nuqRedis;
|
||||
|
||||
const SEMAPHORE_TTL = 30 * 1000;
|
||||
const FDB_OPTIONAL_SLOT_TIMEOUT_MS = 500;
|
||||
type MirrorBackend = "pg" | "fdb";
|
||||
type MirrorState = { backend?: MirrorBackend; touched: Set<MirrorBackend> };
|
||||
|
||||
function fdbForced(): boolean {
|
||||
return config.NUQ_BACKEND === "fdb";
|
||||
}
|
||||
|
||||
async function optionalFdbSlot<T>(operation: () => Promise<T>): Promise<T> {
|
||||
if (fdbForced()) return operation();
|
||||
if (!(await nuqFdbHealthCheck(FDB_OPTIONAL_SLOT_TIMEOUT_MS))) {
|
||||
throw new Error("FDB health check failed before optional slot operation");
|
||||
}
|
||||
return await withFdbTimeout(operation(), FDB_OPTIONAL_SLOT_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
async function acquire(
|
||||
teamId: string,
|
||||
holderId: string,
|
||||
@@ -206,7 +220,9 @@ function startHeartbeat(
|
||||
// PG-backed teams mirror into the Redis ZSET; FDB-backed teams consume an
|
||||
// external slot on the FDB ledger.
|
||||
async function resolveMirrorBackend(teamId: string): Promise<MirrorBackend> {
|
||||
return (await isFdbTeam(teamId)) ? "fdb" : "pg";
|
||||
if (!(await isFdbTeam(teamId))) return "pg";
|
||||
if (fdbForced()) return "fdb";
|
||||
return (await nuqFdbHealthCheck(FDB_OPTIONAL_SLOT_TIMEOUT_MS)) ? "fdb" : "pg";
|
||||
}
|
||||
|
||||
async function releaseMirrorBackend(
|
||||
@@ -215,7 +231,7 @@ async function releaseMirrorBackend(
|
||||
backend: MirrorBackend,
|
||||
): Promise<void> {
|
||||
if (backend === "fdb") {
|
||||
await externalSlotsFdb.release(teamId, holderId);
|
||||
await optionalFdbSlot(() => externalSlotsFdb.release(teamId, holderId));
|
||||
} else {
|
||||
await removeConcurrencyLimitActiveJob(teamId, holderId);
|
||||
}
|
||||
@@ -228,7 +244,9 @@ async function mirrorSlotAcquire(
|
||||
): Promise<void> {
|
||||
const backend = await resolveMirrorBackend(teamId);
|
||||
if (backend === "fdb") {
|
||||
await externalSlotsFdb.acquire(teamId, holderId, 60 * 1000);
|
||||
await optionalFdbSlot(() =>
|
||||
externalSlotsFdb.acquire(teamId, holderId, 60 * 1000),
|
||||
);
|
||||
} else {
|
||||
await pushConcurrencyLimitActiveJob(teamId, holderId, 60 * 1000);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user