This commit is contained in:
Nicolas
2026-05-27 19:00:02 -07:00
parent 9f2550088b
commit 91d92312e6
2 changed files with 59 additions and 12 deletions

View File

@@ -1,33 +1,43 @@
type QueryResult = { data: unknown; error: unknown };
type ChainOp = "select" | "insert" | "update" | "delete" | "unknown";
type ClientKind = "primary" | "rr";
const fromMock = jest.fn();
type CallRecord = { client: ClientKind; op: ChainOp };
const fromPrimary = jest.fn();
const fromRR = jest.fn();
const calls: CallRecord[] = [];
// Each test queues responses in the order the code under test will execute
// queries. The fluent chain swallows any sequence of select/insert/update/
// eq/neq calls and resolves at .maybeSingle() / .single() with the next
// queued result. Tests stay explicit about query order without coupling to
// the precise method chain shape.
type QueuedResponse = (op: ChainOp) => Promise<QueryResult>;
type QueuedResponse = (
client: ClientKind,
op: ChainOp,
) => Promise<QueryResult>;
let queue: QueuedResponse[] = [];
function queueResponses(responses: QueuedResponse[]): void {
queue = [...responses];
}
function makeChain(): any {
function makeChain(client: ClientKind): any {
let op: ChainOp | null = null;
const setOp = (next: ChainOp) => {
if (op === null) op = next;
};
const resolve = (): Promise<QueryResult> => {
const finalOp = op ?? "unknown";
calls.push({ client, op: finalOp });
const next = queue.shift();
if (!next) {
throw new Error(
`No queued Supabase response for op=${op ?? "unknown"} (queue exhausted)`,
`No queued Supabase response for client=${client} op=${finalOp} (queue exhausted)`,
);
}
return next(op ?? "unknown");
return next(client, finalOp);
};
const builder: any = {
select: () => {
@@ -57,10 +67,10 @@ function makeChain(): any {
jest.mock("../supabase", () => ({
supabase_service: {
from: (...args: unknown[]) => fromMock(...args),
from: (...args: unknown[]) => fromPrimary(...args),
},
supabase_rr_service: {
from: (...args: unknown[]) => fromMock(...args),
from: (...args: unknown[]) => fromRR(...args),
},
}));
@@ -72,8 +82,11 @@ import {
beforeEach(() => {
queue = [];
fromMock.mockReset();
fromMock.mockImplementation(() => makeChain());
calls.length = 0;
fromPrimary.mockReset();
fromRR.mockReset();
fromPrimary.mockImplementation(() => makeChain("primary"));
fromRR.mockImplementation(() => makeChain("rr"));
});
function recipientRow(overrides: Record<string, unknown> = {}) {
@@ -141,6 +154,13 @@ describe("ensureMonitorEmailRecipient", () => {
const result = await ensureMonitorEmailRecipient(baseEnsureInput);
expect(result).toEqual({ row: winnerRow, created: false });
// The race-recovery re-fetch MUST hit the primary; the read replica may
// not yet have the row the concurrent writer just committed.
expect(calls.map(c => ({ client: c.client, op: c.op }))).toEqual([
{ client: "rr", op: "select" },
{ client: "primary", op: "insert" },
{ client: "primary", op: "select" },
]);
});
it("rethrows non-unique insert errors", async () => {
@@ -220,6 +240,12 @@ describe("confirmRecipientByToken", () => {
expect(result).toEqual(unsubscribed);
expect(result?.status).toBe("unsubscribed");
// The post-UPDATE re-fetch MUST hit primary so we don't get a stale
// 'pending' from the read replica right after our conditional write.
expect(calls[calls.length - 1]).toEqual({
client: "primary",
op: "select",
});
});
});
@@ -279,5 +305,9 @@ describe("unsubscribeRecipientByToken", () => {
const result = await unsubscribeRecipientByToken("tok-1");
expect(result).toEqual(alreadyUnsub);
expect(calls[calls.length - 1]).toEqual({
client: "primary",
op: "select",
});
});
});

View File

@@ -148,8 +148,25 @@ async function fetchRecipientByMonitorEmail(
return (data ?? null) as MonitorEmailRecipientRow | null;
}
// Reads from the write client to avoid read-replica lag when we need the
// authoritative current state right after a conditional UPDATE.
// Read replicas can lag behind a freshly-committed INSERT, which is exactly
// the moment this re-fetch runs (the concurrent writer just won the unique
// race). Reading from the primary guarantees we see their row.
async function fetchRecipientByMonitorEmailPrimary(
monitorId: string,
email: string,
): Promise<MonitorEmailRecipientRow | null> {
const { data, error } = await supabase_service
.from("monitor_email_recipients")
.select("*")
.eq("monitor_id", monitorId)
.eq("email", email)
.maybeSingle();
throwIfError(error, "Failed to look up monitor email recipient");
return (data ?? null) as MonitorEmailRecipientRow | null;
}
// Same rationale as ...Primary above: a conditional UPDATE just landed, so
// the replica may not have caught up yet when we re-read.
async function fetchRecipientByIdPrimary(
id: string,
): Promise<MonitorEmailRecipientRow | null> {
@@ -201,7 +218,7 @@ export async function ensureMonitorEmailRecipient(params: {
if (error) {
if ((error as { code?: string }).code === POSTGRES_UNIQUE_VIOLATION) {
const winner = await fetchRecipientByMonitorEmail(
const winner = await fetchRecipientByMonitorEmailPrimary(
params.monitorId,
email,
);