fix(broker): atomic message claim to prevent duplicate delivery

drainForMember previously ran SELECT undelivered rows, THEN UPDATE
delivered_at. Two concurrent callers (e.g. WS fan-out on send +
handleHello's own drain for the target) could both SELECT the same
row before either UPDATEd, pushing the same envelope twice.

now: single atomic UPDATE ... FROM member ... WHERE id IN (
  SELECT id ... FOR UPDATE SKIP LOCKED
) RETURNING mq.*, m.peer_pubkey AS sender_pubkey.

FOR UPDATE SKIP LOCKED is the key primitive — concurrent callers
each claim DISJOINT sets, so a message can never be drained twice.
Union of all concurrent drains still covers every eligible row.

re-sorts RETURNING rows by created_at client-side (Postgres makes no
FIFO guarantee on the RETURNING clause's output order), and normalizes
created_at to Date since raw-sql results can come back as ISO strings.

regression: tests/dup-delivery.test.ts (4 tests)
- two concurrent drains produce disjoint result sets
- six concurrent drains partition cleanly (20 messages, each drained once)
- subsequent drain after success returns empty
- FIFO ordering preserved within a single drain

48/48 tests pass. Live round-trip no longer logs the double-push.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-04 22:39:48 +01:00
parent 758ea0e42c
commit cd389c6bdd
2 changed files with 189 additions and 38 deletions

View File

@@ -26,6 +26,7 @@ import {
isNull,
lt,
or,
sql,
} from "drizzle-orm";
import { db } from "./db";
import {
@@ -390,10 +391,12 @@ function deliverablePriorities(status: PeerStatus): Priority[] {
/**
* Drain deliverable messages addressed to a specific member in a mesh.
* Joins mesh.member so each envelope carries the sender's pubkey, which
* the receiving client needs to identify who sent it. Marks drained
* rows as delivered and returns the envelopes for WS push.
* Atomically claims rows via UPDATE ... WHERE id IN (SELECT ... FOR
* UPDATE SKIP LOCKED) — concurrent callers each claim DISJOINT sets,
* so the same message can never be pushed twice (even under fan-out
* racing with handleHello's own drain).
*
* Joins mesh.member so each envelope carries the sender's pubkey.
* targetSpec routing: matches either the member's pubkey directly or
* the broadcast wildcard ("*"). Channel/tag resolution is per-mesh
* config that lives outside this function.
@@ -415,48 +418,70 @@ export async function drainForMember(
}>
> {
const priorities = deliverablePriorities(status);
const targetFilter = or(
eq(messageQueue.targetSpec, memberPubkey),
eq(messageQueue.targetSpec, "*"),
)!;
if (priorities.length === 0) return [];
const priorityList = sql.raw(
priorities.map((p) => `'${p}'`).join(","),
);
const rows = await db
.select({
id: messageQueue.id,
priority: messageQueue.priority,
nonce: messageQueue.nonce,
ciphertext: messageQueue.ciphertext,
createdAt: messageQueue.createdAt,
senderMemberId: messageQueue.senderMemberId,
senderPubkey: memberTable.peerPubkey,
})
.from(messageQueue)
.innerJoin(memberTable, eq(memberTable.id, messageQueue.senderMemberId))
.where(
and(
eq(messageQueue.meshId, meshId),
isNull(messageQueue.deliveredAt),
inArray(messageQueue.priority, priorities),
targetFilter,
),
// Atomic claim: inner SELECT locks candidate rows (skipping any
// already locked by a concurrent drain), outer UPDATE marks them
// delivered, the FROM join fetches the sender's pubkey, RETURNING
// gives us everything we need to push in one round-trip.
const result = await db.execute<{
id: string;
priority: string;
nonce: string;
ciphertext: string;
created_at: Date;
sender_member_id: string;
sender_pubkey: string;
}>(sql`
UPDATE mesh.message_queue AS mq
SET delivered_at = NOW()
FROM mesh.member AS m
WHERE mq.id IN (
SELECT id FROM mesh.message_queue
WHERE mesh_id = ${meshId}
AND delivered_at IS NULL
AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*')
ORDER BY created_at ASC
FOR UPDATE SKIP LOCKED
)
.orderBy(asc(messageQueue.createdAt));
AND m.id = mq.sender_member_id
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
mq.created_at, mq.sender_member_id, m.peer_pubkey AS sender_pubkey
`);
if (rows.length === 0) return [];
const now = new Date();
const ids = rows.map((r) => r.id);
await db
.update(messageQueue)
.set({ deliveredAt: now })
.where(inArray(messageQueue.id, ids));
return rows.map((r) => ({
const rows = (result.rows ?? result) as Array<{
id: string;
priority: string;
nonce: string;
ciphertext: string;
created_at: string | Date;
sender_member_id: string;
sender_pubkey: string;
}>;
if (!rows || rows.length === 0) return [];
// Normalize created_at to Date (pg driver sometimes returns ISO
// strings for raw sql results).
const normalized = rows.map((r) => ({
...r,
created_at:
r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
}));
// RETURNING order may not match the inner SELECT's ORDER BY — re-sort.
normalized.sort(
(a, b) => a.created_at.getTime() - b.created_at.getTime(),
);
return normalized.map((r) => ({
id: r.id,
priority: r.priority as Priority,
nonce: r.nonce,
ciphertext: r.ciphertext,
createdAt: r.createdAt,
senderMemberId: r.senderMemberId,
senderPubkey: r.senderPubkey,
createdAt: r.created_at,
senderMemberId: r.sender_member_id,
senderPubkey: r.sender_pubkey,
}));
}

View File

@@ -0,0 +1,126 @@
/**
* Concurrency regression: drainForMember must return DISJOINT row
* sets when two callers race for the same member's queue.
*
* Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the
* same undelivered rows, both sent push notifications, and only
* after did they race to UPDATE delivered_at. Receivers saw
* duplicate pushes for the same message id.
*
* After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR
* UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows.
*/
import { afterAll, afterEach, describe, expect, test } from "vitest";
import { drainForMember, queueMessage } from "../src/broker";
import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers";
afterAll(async () => {
await cleanupAllTestMeshes();
});
describe("drainForMember — concurrent callers", () => {
let m: TestMesh;
afterEach(async () => m && (await m.cleanup()));
test("two concurrent drains produce disjoint result sets", async () => {
m = await setupTestMesh("dup-basic");
// Queue 10 messages for peer-b.
for (let i = 0; i < 10; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
// Fire two drains in parallel.
const [a, b] = await Promise.all([
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
]);
const idsA = new Set(a.map((r) => r.id));
const idsB = new Set(b.map((r) => r.id));
// No overlap.
for (const id of idsA) expect(idsB.has(id)).toBe(false);
// Union covers all 10.
expect(idsA.size + idsB.size).toBe(10);
});
test("six concurrent drains also partition cleanly", async () => {
m = await setupTestMesh("dup-six");
for (let i = 0; i < 20; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drains = await Promise.all(
Array.from({ length: 6 }).map(() =>
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
),
);
const allIds: string[] = [];
for (const d of drains) for (const r of d) allIds.push(r.id);
const unique = new Set(allIds);
expect(allIds.length).toBe(20);
expect(unique.size).toBe(20);
});
test("after drain, subsequent drain returns empty", async () => {
m = await setupTestMesh("dup-drain-empty");
for (let i = 0; i < 3; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const first = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(first).toHaveLength(3);
const second = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(second).toHaveLength(0);
});
test("FIFO ordering preserved within a single drain", async () => {
m = await setupTestMesh("dup-fifo");
for (let i = 0; i < 5; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drained = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
for (let i = 0; i < 5; i++) {
expect(drained[i]!.ciphertext).toBe(`msg-${i}`);
}
});
});