drainForMember previously ran SELECT undelivered rows, THEN UPDATE delivered_at. Two concurrent callers (e.g. WS fan-out on send + handleHello's own drain for the target) could both SELECT the same row before either UPDATEd, pushing the same envelope twice. now: single atomic UPDATE ... FROM member ... WHERE id IN ( SELECT id ... FOR UPDATE SKIP LOCKED ) RETURNING mq.*, m.peer_pubkey AS sender_pubkey. FOR UPDATE SKIP LOCKED is the key primitive — concurrent callers each claim DISJOINT sets, so a message can never be drained twice. Union of all concurrent drains still covers every eligible row. re-sorts RETURNING rows by created_at client-side (Postgres makes no FIFO guarantee on the RETURNING clause's output order), and normalizes created_at to Date since raw-sql results can come back as ISO strings. regression: tests/dup-delivery.test.ts (4 tests) - two concurrent drains produce disjoint result sets - six concurrent drains partition cleanly (20 messages, each drained once) - subsequent drain after success returns empty - FIFO ordering preserved within a single drain 48/48 tests pass. Live round-trip no longer logs the double-push. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
127 lines
3.8 KiB
TypeScript
127 lines
3.8 KiB
TypeScript
/**
|
|
* Concurrency regression: drainForMember must return DISJOINT row
|
|
* sets when two callers race for the same member's queue.
|
|
*
|
|
* Before the FOR UPDATE SKIP LOCKED fix, both callers SELECTed the
|
|
* same undelivered rows, both sent push notifications, and only
|
|
* after did they race to UPDATE delivered_at. Receivers saw
|
|
* duplicate pushes for the same message id.
|
|
*
|
|
* After the fix, the atomic UPDATE ... WHERE id IN (SELECT ... FOR
|
|
* UPDATE SKIP LOCKED) lets each caller claim non-overlapping rows.
|
|
*/
|
|
|
|
import { afterAll, afterEach, describe, expect, test } from "vitest";
|
|
import { drainForMember, queueMessage } from "../src/broker";
|
|
import { cleanupAllTestMeshes, setupTestMesh, type TestMesh } from "./helpers";
|
|
|
|
afterAll(async () => {
|
|
await cleanupAllTestMeshes();
|
|
});
|
|
|
|
describe("drainForMember — concurrent callers", () => {
|
|
let m: TestMesh;
|
|
afterEach(async () => m && (await m.cleanup()));
|
|
|
|
test("two concurrent drains produce disjoint result sets", async () => {
|
|
m = await setupTestMesh("dup-basic");
|
|
// Queue 10 messages for peer-b.
|
|
for (let i = 0; i < 10; i++) {
|
|
await queueMessage({
|
|
meshId: m.meshId,
|
|
senderMemberId: m.peerA.memberId,
|
|
targetSpec: m.peerB.pubkey,
|
|
priority: "now",
|
|
nonce: `n${i}`,
|
|
ciphertext: `msg-${i}`,
|
|
});
|
|
}
|
|
// Fire two drains in parallel.
|
|
const [a, b] = await Promise.all([
|
|
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
|
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
|
]);
|
|
const idsA = new Set(a.map((r) => r.id));
|
|
const idsB = new Set(b.map((r) => r.id));
|
|
// No overlap.
|
|
for (const id of idsA) expect(idsB.has(id)).toBe(false);
|
|
// Union covers all 10.
|
|
expect(idsA.size + idsB.size).toBe(10);
|
|
});
|
|
|
|
test("six concurrent drains also partition cleanly", async () => {
|
|
m = await setupTestMesh("dup-six");
|
|
for (let i = 0; i < 20; i++) {
|
|
await queueMessage({
|
|
meshId: m.meshId,
|
|
senderMemberId: m.peerA.memberId,
|
|
targetSpec: m.peerB.pubkey,
|
|
priority: "now",
|
|
nonce: `n${i}`,
|
|
ciphertext: `msg-${i}`,
|
|
});
|
|
}
|
|
const drains = await Promise.all(
|
|
Array.from({ length: 6 }).map(() =>
|
|
drainForMember(m.meshId, m.peerB.memberId, m.peerB.pubkey, "idle"),
|
|
),
|
|
);
|
|
const allIds: string[] = [];
|
|
for (const d of drains) for (const r of d) allIds.push(r.id);
|
|
const unique = new Set(allIds);
|
|
expect(allIds.length).toBe(20);
|
|
expect(unique.size).toBe(20);
|
|
});
|
|
|
|
test("after drain, subsequent drain returns empty", async () => {
|
|
m = await setupTestMesh("dup-drain-empty");
|
|
for (let i = 0; i < 3; i++) {
|
|
await queueMessage({
|
|
meshId: m.meshId,
|
|
senderMemberId: m.peerA.memberId,
|
|
targetSpec: m.peerB.pubkey,
|
|
priority: "now",
|
|
nonce: `n${i}`,
|
|
ciphertext: `msg-${i}`,
|
|
});
|
|
}
|
|
const first = await drainForMember(
|
|
m.meshId,
|
|
m.peerB.memberId,
|
|
m.peerB.pubkey,
|
|
"idle",
|
|
);
|
|
expect(first).toHaveLength(3);
|
|
const second = await drainForMember(
|
|
m.meshId,
|
|
m.peerB.memberId,
|
|
m.peerB.pubkey,
|
|
"idle",
|
|
);
|
|
expect(second).toHaveLength(0);
|
|
});
|
|
|
|
test("FIFO ordering preserved within a single drain", async () => {
|
|
m = await setupTestMesh("dup-fifo");
|
|
for (let i = 0; i < 5; i++) {
|
|
await queueMessage({
|
|
meshId: m.meshId,
|
|
senderMemberId: m.peerA.memberId,
|
|
targetSpec: m.peerB.pubkey,
|
|
priority: "now",
|
|
nonce: `n${i}`,
|
|
ciphertext: `msg-${i}`,
|
|
});
|
|
}
|
|
const drained = await drainForMember(
|
|
m.meshId,
|
|
m.peerB.memberId,
|
|
m.peerB.pubkey,
|
|
"idle",
|
|
);
|
|
for (let i = 0; i < 5; i++) {
|
|
expect(drained[i]!.ciphertext).toBe(`msg-${i}`);
|
|
}
|
|
});
|
|
});
|