diff --git a/apps/api/src/services/monitoring/email_recipients.test.ts b/apps/api/src/services/monitoring/email_recipients.test.ts index 256dc0900..8e7b60e2d 100644 --- a/apps/api/src/services/monitoring/email_recipients.test.ts +++ b/apps/api/src/services/monitoring/email_recipients.test.ts @@ -1,33 +1,43 @@ type QueryResult = { data: unknown; error: unknown }; type ChainOp = "select" | "insert" | "update" | "delete" | "unknown"; +type ClientKind = "primary" | "rr"; -const fromMock = jest.fn(); +type CallRecord = { client: ClientKind; op: ChainOp }; + +const fromPrimary = jest.fn(); +const fromRR = jest.fn(); +const calls: CallRecord[] = []; // Each test queues responses in the order the code under test will execute // queries. The fluent chain swallows any sequence of select/insert/update/ // eq/neq calls and resolves at .maybeSingle() / .single() with the next // queued result. Tests stay explicit about query order without coupling to // the precise method chain shape. -type QueuedResponse = (op: ChainOp) => Promise; +type QueuedResponse = ( + client: ClientKind, + op: ChainOp, +) => Promise; let queue: QueuedResponse[] = []; function queueResponses(responses: QueuedResponse[]): void { queue = [...responses]; } -function makeChain(): any { +function makeChain(client: ClientKind): any { let op: ChainOp | null = null; const setOp = (next: ChainOp) => { if (op === null) op = next; }; const resolve = (): Promise => { + const finalOp = op ?? "unknown"; + calls.push({ client, op: finalOp }); const next = queue.shift(); if (!next) { throw new Error( - `No queued Supabase response for op=${op ?? "unknown"} (queue exhausted)`, + `No queued Supabase response for client=${client} op=${finalOp} (queue exhausted)`, ); } - return next(op ?? "unknown"); + return next(client, finalOp); }; const builder: any = { select: () => { @@ -57,10 +67,10 @@ function makeChain(): any { jest.mock("../supabase", () => ({ supabase_service: { - from: (...args: unknown[]) => fromMock(...args), + from: (...args: unknown[]) => fromPrimary(...args), }, supabase_rr_service: { - from: (...args: unknown[]) => fromMock(...args), + from: (...args: unknown[]) => fromRR(...args), }, })); @@ -72,8 +82,11 @@ import { beforeEach(() => { queue = []; - fromMock.mockReset(); - fromMock.mockImplementation(() => makeChain()); + calls.length = 0; + fromPrimary.mockReset(); + fromRR.mockReset(); + fromPrimary.mockImplementation(() => makeChain("primary")); + fromRR.mockImplementation(() => makeChain("rr")); }); function recipientRow(overrides: Record = {}) { @@ -141,6 +154,13 @@ describe("ensureMonitorEmailRecipient", () => { const result = await ensureMonitorEmailRecipient(baseEnsureInput); expect(result).toEqual({ row: winnerRow, created: false }); + // The race-recovery re-fetch MUST hit the primary; the read replica may + // not yet have the row the concurrent writer just committed. + expect(calls.map(c => ({ client: c.client, op: c.op }))).toEqual([ + { client: "rr", op: "select" }, + { client: "primary", op: "insert" }, + { client: "primary", op: "select" }, + ]); }); it("rethrows non-unique insert errors", async () => { @@ -220,6 +240,12 @@ describe("confirmRecipientByToken", () => { expect(result).toEqual(unsubscribed); expect(result?.status).toBe("unsubscribed"); + // The post-UPDATE re-fetch MUST hit primary so we don't get a stale + // 'pending' from the read replica right after our conditional write. + expect(calls[calls.length - 1]).toEqual({ + client: "primary", + op: "select", + }); }); }); @@ -279,5 +305,9 @@ describe("unsubscribeRecipientByToken", () => { const result = await unsubscribeRecipientByToken("tok-1"); expect(result).toEqual(alreadyUnsub); + expect(calls[calls.length - 1]).toEqual({ + client: "primary", + op: "select", + }); }); }); diff --git a/apps/api/src/services/monitoring/email_recipients.ts b/apps/api/src/services/monitoring/email_recipients.ts index c29e48f6d..b46186b98 100644 --- a/apps/api/src/services/monitoring/email_recipients.ts +++ b/apps/api/src/services/monitoring/email_recipients.ts @@ -148,8 +148,25 @@ async function fetchRecipientByMonitorEmail( return (data ?? null) as MonitorEmailRecipientRow | null; } -// Reads from the write client to avoid read-replica lag when we need the -// authoritative current state right after a conditional UPDATE. +// Read replicas can lag behind a freshly-committed INSERT, which is exactly +// the moment this re-fetch runs (the concurrent writer just won the unique +// race). Reading from the primary guarantees we see their row. +async function fetchRecipientByMonitorEmailPrimary( + monitorId: string, + email: string, +): Promise { + const { data, error } = await supabase_service + .from("monitor_email_recipients") + .select("*") + .eq("monitor_id", monitorId) + .eq("email", email) + .maybeSingle(); + throwIfError(error, "Failed to look up monitor email recipient"); + return (data ?? null) as MonitorEmailRecipientRow | null; +} + +// Same rationale as ...Primary above: a conditional UPDATE just landed, so +// the replica may not have caught up yet when we re-read. async function fetchRecipientByIdPrimary( id: string, ): Promise { @@ -201,7 +218,7 @@ export async function ensureMonitorEmailRecipient(params: { if (error) { if ((error as { code?: string }).code === POSTGRES_UNIQUE_VIOLATION) { - const winner = await fetchRecipientByMonitorEmail( + const winner = await fetchRecipientByMonitorEmailPrimary( params.monitorId, email, );