feat(broker): record daemon idempotency fields on message_queue
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Additive plumbing for v0.9.0 daemon spec §4.2/§4.4. Adds two nullable
columns to mesh.message_queue — client_message_id (caller-supplied) and
request_fingerprint (canonical sha256 of the send shape) — and threads
them through the broker:

  - handleSend reads them off the wire envelope when present
  - queueMessage persists them on the row
  - drainForMember projects them onto the push so receiving daemons
    can dedupe their local inbox by client_message_id

Columns stay nullable so legacy traffic (launch CLI, dashboard chat)
continues to flow uninterrupted. Sprint 7 (broker hardening) will add
the partial unique index and the client_message_dedupe atomic-accept
table once we're ready to enforce dedupe broker-side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-03 20:05:36 +01:00
parent abaa4bcf87
commit bf22afb0ed
5 changed files with 62 additions and 4 deletions

View File

@@ -2261,6 +2261,10 @@ export interface QueueParams {
nonce: string; nonce: string;
ciphertext: string; ciphertext: string;
expiresAt?: Date; expiresAt?: Date;
/** Daemon idempotency id (spec §4.2). Optional; pre-daemon callers omit. */
clientMessageId?: string;
/** Canonical request fingerprint hex (spec §4.4). Optional; pre-daemon callers omit. */
requestFingerprint?: string;
} }
/** Insert an E2E envelope into the mesh's message queue. */ /** Insert an E2E envelope into the mesh's message queue. */
@@ -2276,6 +2280,8 @@ export async function queueMessage(params: QueueParams): Promise<string> {
nonce: params.nonce, nonce: params.nonce,
ciphertext: params.ciphertext, ciphertext: params.ciphertext,
expiresAt: params.expiresAt, expiresAt: params.expiresAt,
clientMessageId: params.clientMessageId ?? null,
requestFingerprint: params.requestFingerprint ?? null,
}) })
.returning({ id: messageQueue.id }); .returning({ id: messageQueue.id });
if (!row) throw new Error("failed to queue message"); if (!row) throw new Error("failed to queue message");
@@ -2323,6 +2329,9 @@ export async function drainForMember(
createdAt: Date; createdAt: Date;
senderMemberId: string; senderMemberId: string;
senderPubkey: string; senderPubkey: string;
/** v0.9.0 daemon fields; null for legacy traffic. */
clientMessageId: string | null;
requestFingerprint: string | null;
}> }>
> { > {
const priorities = deliverablePriorities(status); const priorities = deliverablePriorities(status);
@@ -2384,6 +2393,8 @@ export async function drainForMember(
created_at: string | Date; created_at: string | Date;
sender_member_id: string; sender_member_id: string;
sender_pubkey: string; sender_pubkey: string;
client_message_id: string | null;
request_fingerprint: string | null;
}>(sql` }>(sql`
WITH claimed AS ( WITH claimed AS (
UPDATE mesh.message_queue AS mq UPDATE mesh.message_queue AS mq
@@ -2402,6 +2413,7 @@ export async function drainForMember(
AND m.id = mq.sender_member_id AND m.id = mq.sender_member_id
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext, RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
mq.created_at, mq.sender_member_id, mq.created_at, mq.sender_member_id,
mq.client_message_id, mq.request_fingerprint,
COALESCE(mq.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey COALESCE(mq.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey
) )
SELECT * FROM claimed ORDER BY created_at ASC, id ASC SELECT * FROM claimed ORDER BY created_at ASC, id ASC
@@ -2415,6 +2427,8 @@ export async function drainForMember(
created_at: string | Date; created_at: string | Date;
sender_member_id: string; sender_member_id: string;
sender_pubkey: string; sender_pubkey: string;
client_message_id: string | null;
request_fingerprint: string | null;
}>; }>;
if (!rows || rows.length === 0) return []; if (!rows || rows.length === 0) return [];
return rows.map((r) => ({ return rows.map((r) => ({
@@ -2426,6 +2440,8 @@ export async function drainForMember(
r.created_at instanceof Date ? r.created_at : new Date(r.created_at), r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
senderMemberId: r.sender_member_id, senderMemberId: r.sender_member_id,
senderPubkey: r.sender_pubkey, senderPubkey: r.sender_pubkey,
clientMessageId: r.client_message_id ?? null,
requestFingerprint: r.request_fingerprint ?? null,
})); }));
} }

View File

@@ -564,6 +564,8 @@ async function maybePushQueuedMessages(
nonce: m.nonce, nonce: m.nonce,
ciphertext: m.ciphertext, ciphertext: m.ciphertext,
createdAt: m.createdAt.toISOString(), createdAt: m.createdAt.toISOString(),
...(m.clientMessageId ? { client_message_id: m.clientMessageId } : {}),
...(m.requestFingerprint ? { request_fingerprint: m.requestFingerprint } : {}),
}; };
sendToPeer(presenceId, push); sendToPeer(presenceId, push);
metrics.messagesRoutedTotal.inc({ priority: m.priority }); metrics.messagesRoutedTotal.inc({ priority: m.priority });
@@ -1968,6 +1970,12 @@ async function handleSend(
} }
} }
// v0.9.0 daemon clients attach a stable idempotency id and the canonical
// request fingerprint per spec §4.2/§4.4. Forward both verbatim; legacy
// callers omit them and the columns are nullable.
const clientMessageId = (msg as { client_message_id?: string }).client_message_id;
const requestFingerprint = (msg as { request_fingerprint?: string }).request_fingerprint;
const messageId = await queueMessage({ const messageId = await queueMessage({
meshId: conn.meshId, meshId: conn.meshId,
senderMemberId: conn.memberId, senderMemberId: conn.memberId,
@@ -1976,6 +1984,8 @@ async function handleSend(
priority: msg.priority, priority: msg.priority,
nonce: msg.nonce, nonce: msg.nonce,
ciphertext: msg.ciphertext, ciphertext: msg.ciphertext,
clientMessageId: clientMessageId && clientMessageId.length > 0 ? clientMessageId : undefined,
requestFingerprint: requestFingerprint && requestFingerprint.length > 0 ? requestFingerprint : undefined,
}); });
// Topic-tagged messages (targetSpec starts with `#<topicId>`) get // Topic-tagged messages (targetSpec starts with `#<topicId>`) get

View File

@@ -139,6 +139,12 @@ export interface WSPushMessage {
nonce: string; nonce: string;
ciphertext: string; ciphertext: string;
createdAt: string; createdAt: string;
/** v0.9.0 daemon fields. Echoed when the sender's send envelope
* carried them (spec §4.2/§4.4). Receivers use `client_message_id`
* for idempotent inbox dedupe and `request_fingerprint` for
* defense-in-depth verification. Both null on legacy traffic. */
client_message_id?: string | null;
request_fingerprint?: string | null;
/** Optional semantic tag — "reminder" when delivered by the scheduler, /** Optional semantic tag — "reminder" when delivered by the scheduler,
* "system" for broker-originated topology events (peer join/leave). */ * "system" for broker-originated topology events (peer join/leave). */
subtype?: "reminder" | "system"; subtype?: "reminder" | "system";

View File

@@ -0,0 +1,18 @@
-- Daemon idempotency fields on message_queue (v0.9.0 daemon spec §4.2 / §4.4).
--
-- Adds two nullable columns so the daemon can attach its caller-supplied
-- `client_message_id` and the canonical `request_fingerprint` (sha256 hex
-- of the canonical request shape) to every send.
--
-- Both columns are nullable for backward compatibility — legacy traffic
-- from `claudemesh launch` and the dashboard chat doesn't carry them yet.
-- Sprint 7 (full broker hardening) will:
-- - add a partial unique index `(mesh_id, client_message_id) WHERE
-- client_message_id IS NOT NULL` once we're ready to enforce dedupe.
-- - introduce the `mesh.client_message_dedupe` table for atomic accept.
-- Until then, recording the values lets the broker echo them back on push
-- so daemon-side inboxes can dedupe correctly even with multiple senders.
ALTER TABLE "mesh"."message_queue"
ADD COLUMN "client_message_id" text,
ADD COLUMN "request_fingerprint" text;

View File

@@ -359,6 +359,14 @@ export const messageQueue = meshSchema.table("message_queue", {
createdAt: timestamp().defaultNow().notNull(), createdAt: timestamp().defaultNow().notNull(),
deliveredAt: timestamp(), deliveredAt: timestamp(),
expiresAt: timestamp(), expiresAt: timestamp(),
// v0.9.0 daemon: caller-supplied idempotency id (spec §4.2). Nullable
// for legacy traffic. Sprint 7+ promotes it to a partial-unique index
// and adds the mesh.client_message_dedupe table for atomic accept.
clientMessageId: text("client_message_id"),
// v0.9.0 daemon: 32-byte sha256 of the canonical request shape (spec
// §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want
// to enforce idempotency on retries will read this column.
requestFingerprint: text("request_fingerprint"),
}); });
/** /**
@@ -1679,7 +1687,7 @@ export const meshApiKey = meshSchema.table(
secretPrefix: text().notNull(), secretPrefix: text().notNull(),
/** Granted capabilities. Empty = no permissions; key is a stub. */ /** Granted capabilities. Empty = no permissions; key is a stub. */
capabilities: jsonb() capabilities: jsonb()
.$type<Array<"send" | "read" | "state_write" | "admin">>() .$type<("send" | "read" | "state_write" | "admin")[]>()
.notNull() .notNull()
.default([]), .default([]),
/** /**