diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 19153f2..ebf4bda 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -26,6 +26,7 @@ import { isNull, lt, or, + sql, } from "drizzle-orm"; import { db } from "./db"; import { @@ -390,10 +391,12 @@ function deliverablePriorities(status: PeerStatus): Priority[] { /** * Drain deliverable messages addressed to a specific member in a mesh. - * Joins mesh.member so each envelope carries the sender's pubkey, which - * the receiving client needs to identify who sent it. Marks drained - * rows as delivered and returns the envelopes for WS push. + * Atomically claims rows via UPDATE ... WHERE id IN (SELECT ... FOR + * UPDATE SKIP LOCKED) — concurrent callers each claim DISJOINT sets, + * so the same message can never be pushed twice (even under fan-out + * racing with handleHello's own drain). * + * Joins mesh.member so each envelope carries the sender's pubkey. * targetSpec routing: matches either the member's pubkey directly or * the broadcast wildcard ("*"). Channel/tag resolution is per-mesh * config that lives outside this function. @@ -415,48 +418,70 @@ export async function drainForMember( }> > { const priorities = deliverablePriorities(status); - const targetFilter = or( - eq(messageQueue.targetSpec, memberPubkey), - eq(messageQueue.targetSpec, "*"), - )!; + if (priorities.length === 0) return []; + const priorityList = sql.raw( + priorities.map((p) => `'${p}'`).join(","), + ); - const rows = await db - .select({ - id: messageQueue.id, - priority: messageQueue.priority, - nonce: messageQueue.nonce, - ciphertext: messageQueue.ciphertext, - createdAt: messageQueue.createdAt, - senderMemberId: messageQueue.senderMemberId, - senderPubkey: memberTable.peerPubkey, - }) - .from(messageQueue) - .innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId)) - .where( - and( - eq(messageQueue.meshId, meshId), - isNull(messageQueue.deliveredAt), - inArray(messageQueue.priority, priorities), - targetFilter, - ), + // Atomic claim: inner SELECT locks candidate rows (skipping any + // already locked by a concurrent drain), outer UPDATE marks them + // delivered, the FROM join fetches the sender's pubkey, RETURNING + // gives us everything we need to push in one round-trip. + const result = await db.execute<{ + id: string; + priority: string; + nonce: string; + ciphertext: string; + created_at: Date; + sender_member_id: string; + sender_pubkey: string; + }>(sql` + UPDATE mesh.message_queue AS mq + SET delivered_at = NOW() + FROM mesh.member AS m + WHERE mq.id IN ( + SELECT id FROM mesh.message_queue + WHERE mesh_id = ${meshId} + AND delivered_at IS NULL + AND priority::text IN (${priorityList}) + AND (target_spec = ${memberPubkey} OR target_spec = '*') + ORDER BY created_at ASC + FOR UPDATE SKIP LOCKED ) - .orderBy(asc(messageQueue.createdAt)); + AND m.id = mq.sender_member_id + RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext, + mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey + `); - if (rows.length === 0) return []; - const now = new Date(); - const ids = rows.map((r) => r.id); - await db - .update(messageQueue) - .set({ deliveredAt: now }) - .where(inArray(messageQueue.id, ids)); - return rows.map((r) => ({ + const rows = (result.rows ?? result) as Array<{ + id: string; + priority: string; + nonce: string; + ciphertext: string; + created_at: string | Date; + sender_member_id: string; + sender_pubkey: string; + }>; + if (!rows || rows.length === 0) return []; + // Normalize created_at to Date (pg driver sometimes returns ISO + // strings for raw sql results). + const normalized = rows.map((r) => ({ + ...r, + created_at: + r.created_at instanceof Date ? r.created_at : new Date(r.created_at), + })); + // RETURNING order may not match the inner SELECT's ORDER BY — re-sort. + normalized.sort( + (a, b) => a.created_at.getTime() - b.created_at.getTime(), + ); + return normalized.map((r) => ({ id: r.id, priority: r.priority as Priority, nonce: r.nonce, ciphertext: r.ciphertext, - createdAt: r.createdAt, - senderMemberId: r.senderMemberId, - senderPubkey: r.senderPubkey, + createdAt: r.created_at, + senderMemberId: r.sender_member_id, + senderPubkey: r.sender_pubkey, })); } diff --git a/apps/broker/tests/dup-delivery.test.ts b/apps/broker/tests/dup-delivery.test.ts new file mode 100644 index 0000000..1d3e89b --- /dev/null +++ b/apps/broker/tests/dup-delivery.test.ts @@ -0,0 +1,126 @@ +/** + * Concurrency regression: drainForMember must return DISJOINT row + * sets when two callers race for the same member's queue. + * + * Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the + * same undelivered rows, both sent push notifications, and only + * after did they race to UPDATE delivered_at. Receivers saw + * duplicate pushes for the same message id. + * + * After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR + * UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows. + */ + +import { afterAll, afterEach, describe, expect, test } from "vitest"; +import { drainForMember, queueMessage } from "../src/broker"; +import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers"; + +afterAll(async () => { + await cleanupAllTestMeshes(); +}); + +describe("drainForMember — concurrent callers", () => { + let m: TestMesh; + afterEach(async () => m && (await m.cleanup())); + + test("two concurrent drains produce disjoint result sets", async () => { + m = await setupTestMesh("dup-basic"); + // Queue 10 messages for peer-b. + for (let i = 0; i < 10; i++) { + await queueMessage({ + meshId: m.meshId, + senderMemberId: m.peerA.memberId, + targetSpec: m.peerB.pubkey, + priority: "now", + nonce: `n${i}`, + ciphertext: `msg-${i}`, + }); + } + // Fire two drains in parallel. + const [a, b] = await Promise.all([ + drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"), + drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"), + ]); + const idsA = new Set(a.map((r) => r.id)); + const idsB = new Set(b.map((r) => r.id)); + // No overlap. + for (const id of idsA) expect(idsB.has(id)).toBe(false); + // Union covers all 10. + expect(idsA.size + idsB.size).toBe(10); + }); + + test("six concurrent drains also partition cleanly", async () => { + m = await setupTestMesh("dup-six"); + for (let i = 0; i < 20; i++) { + await queueMessage({ + meshId: m.meshId, + senderMemberId: m.peerA.memberId, + targetSpec: m.peerB.pubkey, + priority: "now", + nonce: `n${i}`, + ciphertext: `msg-${i}`, + }); + } + const drains = await Promise.all( + Array.from({ length: 6 }).map(() => + drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"), + ), + ); + const allIds: string[] = []; + for (const d of drains) for (const r of d) allIds.push(r.id); + const unique = new Set(allIds); + expect(allIds.length).toBe(20); + expect(unique.size).toBe(20); + }); + + test("after drain, subsequent drain returns empty", async () => { + m = await setupTestMesh("dup-drain-empty"); + for (let i = 0; i < 3; i++) { + await queueMessage({ + meshId: m.meshId, + senderMemberId: m.peerA.memberId, + targetSpec: m.peerB.pubkey, + priority: "now", + nonce: `n${i}`, + ciphertext: `msg-${i}`, + }); + } + const first = await drainForMember( + m.meshId, + m.peerB.memberId, + m.peerB.pubkey, + "idle", + ); + expect(first).toHaveLength(3); + const second = await drainForMember( + m.meshId, + m.peerB.memberId, + m.peerB.pubkey, + "idle", + ); + expect(second).toHaveLength(0); + }); + + test("FIFO ordering preserved within a single drain", async () => { + m = await setupTestMesh("dup-fifo"); + for (let i = 0; i < 5; i++) { + await queueMessage({ + meshId: m.meshId, + senderMemberId: m.peerA.memberId, + targetSpec: m.peerB.pubkey, + priority: "now", + nonce: `n${i}`, + ciphertext: `msg-${i}`, + }); + } + const drained = await drainForMember( + m.meshId, + m.peerB.memberId, + m.peerB.pubkey, + "idle", + ); + for (let i = 0; i < 5; i++) { + expect(drained[i]!.ciphertext).toBe(`msg-${i}`); + } + }); +});