From bf22afb0ed7bdde18ff122bf8a24fd4feab994ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sun, 3 May 2026 20:05:36 +0100 Subject: [PATCH] feat(broker): record daemon idempotency fields on message_queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Additive plumbing for v0.9.0 daemon spec §4.2/§4.4. Adds two nullable columns to mesh.message_queue — client_message_id (caller-supplied) and request_fingerprint (canonical sha256 of the send shape) — and threads them through the broker: - handleSend reads them off the wire envelope when present - queueMessage persists them on the row - drainForMember projects them onto the push so receiving daemons can dedupe their local inbox by client_message_id Columns stay nullable so legacy traffic (launch CLI, dashboard chat) continues to flow uninterrupted. Sprint 7 (broker hardening) will add the partial unique index and the client_message_dedupe atomic-accept table once we're ready to enforce dedupe broker-side. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/broker/src/broker.ts | 16 ++++++++++++++++ apps/broker/src/index.ts | 10 ++++++++++ apps/broker/src/types.ts | 6 ++++++ .../0028_message_queue_idempotency_fields.sql | 18 ++++++++++++++++++ packages/db/src/schema/mesh.ts | 16 ++++++++++++---- 5 files changed, 62 insertions(+), 4 deletions(-) create mode 100644 packages/db/migrations/0028_message_queue_idempotency_fields.sql diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index a29237d..6f84339 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -2261,6 +2261,10 @@ export interface QueueParams { nonce: string; ciphertext: string; expiresAt?: Date; + /** Daemon idempotency id (spec §4.2). Optional; pre-daemon callers omit. */ + clientMessageId?: string; + /** Canonical request fingerprint hex (spec §4.4). Optional; pre-daemon callers omit. */ + requestFingerprint?: string; } /** Insert an E2E envelope into the mesh's message queue. */ @@ -2276,6 +2280,8 @@ export async function queueMessage(params: QueueParams): Promise { nonce: params.nonce, ciphertext: params.ciphertext, expiresAt: params.expiresAt, + clientMessageId: params.clientMessageId ?? null, + requestFingerprint: params.requestFingerprint ?? null, }) .returning({ id: messageQueue.id }); if (!row) throw new Error("failed to queue message"); @@ -2323,6 +2329,9 @@ export async function drainForMember( createdAt: Date; senderMemberId: string; senderPubkey: string; + /** v0.9.0 daemon fields; null for legacy traffic. */ + clientMessageId: string | null; + requestFingerprint: string | null; }> > { const priorities = deliverablePriorities(status); @@ -2384,6 +2393,8 @@ export async function drainForMember( created_at: string | Date; sender_member_id: string; sender_pubkey: string; + client_message_id: string | null; + request_fingerprint: string | null; }>(sql` WITH claimed AS ( UPDATE mesh.message_queue AS mq @@ -2402,6 +2413,7 @@ export async function drainForMember( AND m.id = mq.sender_member_id RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext, mq.created_at, mq.sender_member_id, + mq.client_message_id, mq.request_fingerprint, COALESCE(mq.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey ) SELECT * FROM claimed ORDER BY created_at ASC, id ASC @@ -2415,6 +2427,8 @@ export async function drainForMember( created_at: string | Date; sender_member_id: string; sender_pubkey: string; + client_message_id: string | null; + request_fingerprint: string | null; }>; if (!rows || rows.length === 0) return []; return rows.map((r) => ({ @@ -2426,6 +2440,8 @@ export async function drainForMember( r.created_at instanceof Date ? r.created_at : new Date(r.created_at), senderMemberId: r.sender_member_id, senderPubkey: r.sender_pubkey, + clientMessageId: r.client_message_id ?? null, + requestFingerprint: r.request_fingerprint ?? null, })); } diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 6b3ea7f..75dc963 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -564,6 +564,8 @@ async function maybePushQueuedMessages( nonce: m.nonce, ciphertext: m.ciphertext, createdAt: m.createdAt.toISOString(), + ...(m.clientMessageId ? { client_message_id: m.clientMessageId } : {}), + ...(m.requestFingerprint ? { request_fingerprint: m.requestFingerprint } : {}), }; sendToPeer(presenceId, push); metrics.messagesRoutedTotal.inc({ priority: m.priority }); @@ -1968,6 +1970,12 @@ async function handleSend( } } + // v0.9.0 daemon clients attach a stable idempotency id and the canonical + // request fingerprint per spec §4.2/§4.4. Forward both verbatim; legacy + // callers omit them and the columns are nullable. + const clientMessageId = (msg as { client_message_id?: string }).client_message_id; + const requestFingerprint = (msg as { request_fingerprint?: string }).request_fingerprint; + const messageId = await queueMessage({ meshId: conn.meshId, senderMemberId: conn.memberId, @@ -1976,6 +1984,8 @@ async function handleSend( priority: msg.priority, nonce: msg.nonce, ciphertext: msg.ciphertext, + clientMessageId: clientMessageId && clientMessageId.length > 0 ? clientMessageId : undefined, + requestFingerprint: requestFingerprint && requestFingerprint.length > 0 ? requestFingerprint : undefined, }); // Topic-tagged messages (targetSpec starts with `#`) get diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 95b5710..ca43a64 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -139,6 +139,12 @@ export interface WSPushMessage { nonce: string; ciphertext: string; createdAt: string; + /** v0.9.0 daemon fields. Echoed when the sender's send envelope + * carried them (spec §4.2/§4.4). Receivers use `client_message_id` + * for idempotent inbox dedupe and `request_fingerprint` for + * defense-in-depth verification. Both null on legacy traffic. */ + client_message_id?: string | null; + request_fingerprint?: string | null; /** Optional semantic tag — "reminder" when delivered by the scheduler, * "system" for broker-originated topology events (peer join/leave). */ subtype?: "reminder" | "system"; diff --git a/packages/db/migrations/0028_message_queue_idempotency_fields.sql b/packages/db/migrations/0028_message_queue_idempotency_fields.sql new file mode 100644 index 0000000..332d514 --- /dev/null +++ b/packages/db/migrations/0028_message_queue_idempotency_fields.sql @@ -0,0 +1,18 @@ +-- Daemon idempotency fields on message_queue (v0.9.0 daemon spec §4.2 / §4.4). +-- +-- Adds two nullable columns so the daemon can attach its caller-supplied +-- `client_message_id` and the canonical `request_fingerprint` (sha256 hex +-- of the canonical request shape) to every send. +-- +-- Both columns are nullable for backward compatibility — legacy traffic +-- from `claudemesh launch` and the dashboard chat doesn't carry them yet. +-- Sprint 7 (full broker hardening) will: +-- - add a partial unique index `(mesh_id, client_message_id) WHERE +-- client_message_id IS NOT NULL` once we're ready to enforce dedupe. +-- - introduce the `mesh.client_message_dedupe` table for atomic accept. +-- Until then, recording the values lets the broker echo them back on push +-- so daemon-side inboxes can dedupe correctly even with multiple senders. + +ALTER TABLE "mesh"."message_queue" + ADD COLUMN "client_message_id" text, + ADD COLUMN "request_fingerprint" text; diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 24f85b9..7b3a791 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -359,6 +359,14 @@ export const messageQueue = meshSchema.table("message_queue", { createdAt: timestamp().defaultNow().notNull(), deliveredAt: timestamp(), expiresAt: timestamp(), + // v0.9.0 daemon: caller-supplied idempotency id (spec §4.2). Nullable + // for legacy traffic. Sprint 7+ promotes it to a partial-unique index + // and adds the mesh.client_message_dedupe table for atomic accept. + clientMessageId: text("client_message_id"), + // v0.9.0 daemon: 32-byte sha256 of the canonical request shape (spec + // §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want + // to enforce idempotency on retries will read this column. + requestFingerprint: text("request_fingerprint"), }); /** @@ -1658,10 +1666,10 @@ export type InsertMeshNotification = typeof meshNotification.$inferInsert; * ──────────────────────────────────────────────────────────────────────── */ export const apiKeyCapabilityEnum = meshSchema.enum("api_key_capability", [ - "send", // POST /messages - "read", // GET /messages, /peers, /state + "send", // POST /messages + "read", // GET /messages, /peers, /state "state_write", // POST /state - "admin", // issue/revoke other keys, delete topics, etc. + "admin", // issue/revoke other keys, delete topics, etc. ]); export const meshApiKey = meshSchema.table( @@ -1679,7 +1687,7 @@ export const meshApiKey = meshSchema.table( secretPrefix: text().notNull(), /** Granted capabilities. Empty = no permissions; key is a stub. */ capabilities: jsonb() - .$type>() + .$type<("send" | "read" | "state_write" | "admin")[]>() .notNull() .default([]), /**