Files
claudemesh/apps/broker/tests/dup-delivery.test.ts
Alejandro Gutiérrez cd389c6bdd fix(broker): atomic message claim to prevent duplicate delivery
drainForMember previously ran SELECT undelivered rows, THEN UPDATE
delivered_at. Two concurrent callers (e.g. WS fan-out on send +
handleHello's own drain for the target) could both SELECT the same
row before either UPDATEd, pushing the same envelope twice.

now: single atomic UPDATE ... FROM member ... WHERE id IN (
  SELECT id ... FOR UPDATE SKIP LOCKED
) RETURNING mq.*, m.peer_pubkey AS sender_pubkey.

FOR UPDATE SKIP LOCKED is the key primitive — concurrent callers
each claim DISJOINT sets, so a message can never be drained twice.
Union of all concurrent drains still covers every eligible row.

re-sorts RETURNING rows by created_at client-side (Postgres makes no
FIFO guarantee on the RETURNING clause's output order), and normalizes
created_at to Date since raw-sql results can come back as ISO strings.

regression: tests/dup-delivery.test.ts (4 tests)
- two concurrent drains produce disjoint result sets
- six concurrent drains partition cleanly (20 messages, each drained once)
- subsequent drain after success returns empty
- FIFO ordering preserved within a single drain

48/48 tests pass. Live round-trip no longer logs the double-push.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:39:48 +01:00

127 lines
3.8 KiB
TypeScript

/**
* Concurrency regression: drainForMember must return DISJOINT row
* sets when two callers race for the same member's queue.
*
* Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the
* same undelivered rows, both sent push notifications, and only
* after did they race to UPDATE delivered_at. Receivers saw
* duplicate pushes for the same message id.
*
* After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR
* UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows.
*/
import { afterAll, afterEach, describe, expect, test } from "vitest";
import { drainForMember, queueMessage } from "../src/broker";
import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers";
afterAll(async () => {
await cleanupAllTestMeshes();
});
describe("drainForMember — concurrent callers", () => {
let m: TestMesh;
afterEach(async () => m && (await m.cleanup()));
test("two concurrent drains produce disjoint result sets", async () => {
m = await setupTestMesh("dup-basic");
// Queue 10 messages for peer-b.
for (let i = 0; i < 10; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
// Fire two drains in parallel.
const [a, b] = await Promise.all([
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
]);
const idsA = new Set(a.map((r) => r.id));
const idsB = new Set(b.map((r) => r.id));
// No overlap.
for (const id of idsA) expect(idsB.has(id)).toBe(false);
// Union covers all 10.
expect(idsA.size + idsB.size).toBe(10);
});
test("six concurrent drains also partition cleanly", async () => {
m = await setupTestMesh("dup-six");
for (let i = 0; i < 20; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drains = await Promise.all(
Array.from({ length: 6 }).map(() =>
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
),
);
const allIds: string[] = [];
for (const d of drains) for (const r of d) allIds.push(r.id);
const unique = new Set(allIds);
expect(allIds.length).toBe(20);
expect(unique.size).toBe(20);
});
test("after drain, subsequent drain returns empty", async () => {
m = await setupTestMesh("dup-drain-empty");
for (let i = 0; i < 3; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const first = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(first).toHaveLength(3);
const second = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
expect(second).toHaveLength(0);
});
test("FIFO ordering preserved within a single drain", async () => {
m = await setupTestMesh("dup-fifo");
for (let i = 0; i < 5; i++) {
await queueMessage({
meshId: m.meshId,
senderMemberId: m.peerA.memberId,
targetSpec: m.peerB.pubkey,
priority: "now",
nonce: `n${i}`,
ciphertext: `msg-${i}`,
});
}
const drained = await drainForMember(
m.meshId,
m.peerB.memberId,
m.peerB.pubkey,
"idle",
);
for (let i = 0; i < 5; i++) {
expect(drained[i]!.ciphertext).toBe(`msg-${i}`);
}
});
});