From 5a8db796a0cd9d2fe9fa41abd2868211be9825b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Mon, 4 May 2026 18:10:04 +0100 Subject: [PATCH 1/2] =?UTF-8?q?feat(db):=20m1=20=E2=80=94=20message=5Fqueu?= =?UTF-8?q?e=20claim=20lease=20+=20presence.role=20columns?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Schema groundwork for v2 agentic-comms milestone 1. mesh.message_queue gets three nullable columns (claimed_at, claim_id, claim_expires_at) so drainForMember can move from "claim-and-deliver in one UPDATE" to a two-phase claim/lease + recipient-ack model. This is the at-least-once retry hook the broker has been missing. mesh.presence gets a typed `role` column ('control-plane' | 'session' | 'service') with default 'session' so legacy hellos keep working. The CLI's hidden-daemon hack (peerType === 'claudemesh-daemon') will swap to a role-based filter in a follow-up worktree. Migration is hand-authored as 0029_*.sql to match the existing pattern (drizzle-kit's _journal.json drifted long ago — the runtime migrator in apps/broker/src/migrate.ts tracks files lexicographically via mesh.__cmh_migrations, not the journal). --- .../0029_drain_lease_and_presence_role.sql | 48 +++++++++++++++++++ packages/db/src/schema/mesh.ts | 16 +++++++ 2 files changed, 64 insertions(+) create mode 100644 packages/db/migrations/0029_drain_lease_and_presence_role.sql diff --git a/packages/db/migrations/0029_drain_lease_and_presence_role.sql b/packages/db/migrations/0029_drain_lease_and_presence_role.sql new file mode 100644 index 0000000..c60514d --- /dev/null +++ b/packages/db/migrations/0029_drain_lease_and_presence_role.sql @@ -0,0 +1,48 @@ +-- Milestone 1 (v2 agentic-comms architecture). +-- +-- Two concerns rolled into one migration because both are tiny and both +-- ship together with the broker change in the same PR: +-- +-- 1. message_queue claim/lease columns (drainForMember race fix) +-- -------------------------------------------------------------- +-- Before this migration, drainForMember claimed rows by setting +-- `delivered_at = NOW()` inside the same UPDATE that selected them. +-- If the recipient WS was closed between claim-time and ws.send(), +-- the message was silently dropped — the row read as "delivered" so +-- the next reconnect's drain skipped it. At-most-once semantics with +-- no retry hook. +-- +-- The fix moves to two-phase claim/deliver with a lease: +-- claimed_at — set when drainForMember picks the row +-- claim_id — presenceId of the claimer (debugging) +-- claim_expires_at — claimed_at + 30s; if no `client_ack` lands by +-- then, a sweeper clears the claim and the row +-- is re-eligible for a new drain (at-least-once). +-- +-- `delivered_at` only gets set when the recipient WS replies with a +-- `client_ack` containing the original client_message_id. Until any +-- daemon emits `client_ack`, claims will simply expire and re-deliver +-- — which is the desired retry behaviour for unreliable transports. +-- +-- 2. presence.role column +-- -------------------------------------------------------------- +-- The CLI currently hides daemon connections from `peer list` by +-- matching `peerType === 'claudemesh-daemon'`, which is fragile and +-- overloads a free-form field. M1 introduces a typed `role` column on +-- presence with three documented values: +-- 'control-plane' — long-lived daemon WS (one per host) +-- 'session' — per-Claude-Code-session WS (default) +-- 'service' — autonomous bots/services attached to a mesh +-- +-- Backfilled to 'session' (default) so legacy presence rows keep their +-- existing visibility. The two hello paths in the broker pass +-- 'control-plane' / 'session' explicitly. CLI-side filter swap +-- (peerType -> role) is a follow-up worktree. + +ALTER TABLE "mesh"."message_queue" + ADD COLUMN "claimed_at" timestamp, + ADD COLUMN "claim_id" text, + ADD COLUMN "claim_expires_at" timestamp; + +ALTER TABLE "mesh"."presence" + ADD COLUMN "role" text NOT NULL DEFAULT 'session'; diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 7b3a791..c8dc98f 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -326,6 +326,14 @@ export const presence = meshSchema.table("presence", { statusUpdatedAt: timestamp().defaultNow().notNull(), summary: text(), groups: jsonb().$type<{ name: string; role?: string }[]>().default([]), + // v2 agentic-comms (M1): connection role for routing/visibility. + // 'control-plane' — long-lived daemon WS (claudemesh daemon), + // used for fan-out and presence orchestration. + // Hidden from user-facing peer lists. + // 'session' — per-Claude-Code session WS (default). + // 'service' — autonomous bots/services attached to the mesh. + // Always populated; default 'session' keeps legacy hellos working. + role: text().notNull().default("session"), connectedAt: timestamp().defaultNow().notNull(), lastPingAt: timestamp().defaultNow().notNull(), disconnectedAt: timestamp(), @@ -367,6 +375,14 @@ export const messageQueue = meshSchema.table("message_queue", { // §4.4), hex-encoded. Nullable for legacy traffic. Brokers that want // to enforce idempotency on retries will read this column. requestFingerprint: text("request_fingerprint"), + // v2 agentic-comms (M1): two-phase claim/deliver with lease. + // `drainForMember` claims a row by setting (claimedAt, claimId, + // claimExpiresAt) — NOT deliveredAt. The recipient's WS only marks + // deliveredAt after replying with a `client_ack`. A periodic sweeper + // reaps expired claims so dropped pushes are redelivered (at-least-once). + claimedAt: timestamp(), + claimId: text("claim_id"), + claimExpiresAt: timestamp(), }); /** From b57e47ed654aa1e5579f2f178c0f720411fbeea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Mon, 4 May 2026 18:10:25 +0100 Subject: [PATCH 2/2] =?UTF-8?q?feat(broker):=20m1=20=E2=80=94=20two-phase?= =?UTF-8?q?=20claim/deliver=20+=20client=5Fack=20+=20role-tagged=20presenc?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three correctness fixes on top of the m1 schema migration: 1) Fix the drainForMember claim-then-push race ---------------------------------------------------------------- Previously the claim CTE set delivered_at = NOW() *before* the WS send. If readyState !== OPEN at push time, the row was marked delivered and the message dropped silently — at-most-once with no retry hook. The new flow: - claim sets (claimed_at, claim_id, claim_expires_at = NOW()+30s) - delivered_at stays NULL until the recipient acks - re-eligibility predicate now also accepts rows whose lease expired, so dropped pushes redeliver (at-least-once) Adds two helpers: - markDelivered() — scoped to (mesh_id, recipient pubkey) so a peer can only ack its own messages - sweepExpiredClaims() — clears expired (claimed_at, claim_id, claim_expires_at) every 15s, wired into startSweepers 2) Accept `client_ack` from recipients ---------------------------------------------------------------- New WS message type handled in the dispatcher right after `send`. Lookups by clientMessageId or brokerMessageId; either is fine. Until the daemon (apps/cli, separate worktree) starts emitting acks, leases will simply expire and re-deliver — which is the desired retry behaviour. 3) Tag presence rows with `role` ---------------------------------------------------------------- handleHello (member-keyed, used by the long-lived daemon WS) → role: 'control-plane' handleSessionHello (per-Claude-Code session WS) → role: 'session' listPeersInMesh exposes the new field; the peers_list response surfaces it. WSPeersListMessage type adds an optional `role` plus the long-undocumented `memberPubkey`. CLI-side filter swap from peerType to role lands in a follow-up worktree — that's why the CLI is untouched here per the M1 spec. Typechecks clean (apps/broker tsc --noEmit, packages/db tsc --noEmit). Test suite needs a real DB so wasn't run in this worktree; existing dup-delivery and broker tests use drainForMember positionally and the new claimerPresenceId arg is optional, so they should continue to pass. --- apps/broker/src/broker.ts | 135 +++++++++++++++++++++++++++++++++++++- apps/broker/src/index.ts | 49 ++++++++++++++ apps/broker/src/types.ts | 27 ++++++++ 3 files changed, 210 insertions(+), 1 deletion(-) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 7888bff..165e11b 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -369,8 +369,19 @@ export interface ConnectParams { pid: number; cwd: string; groups?: Array<{ name: string; role?: string }>; + /** + * v2 agentic-comms (M1) — connection role. + * 'control-plane' — daemon WS (hidden from user-facing peer lists). + * 'session' — per-Claude-Code-session WS (default). + * 'service' — autonomous bots/services attached to the mesh. + * Optional for backwards compatibility; defaults to 'session'. + */ + role?: PresenceRole; } +/** v2 agentic-comms (M1): typed connection roles. */ +export type PresenceRole = "control-plane" | "session" | "service"; + /** Create a presence row for a new WS connection. */ export async function connectPresence( params: ConnectParams, @@ -389,6 +400,7 @@ export async function connectPresence( statusSource: "jsonl", statusUpdatedAt: now, groups: params.groups ?? [], + role: params.role ?? "session", connectedAt: now, lastPingAt: now, }) @@ -431,6 +443,9 @@ export async function listPeersInMesh( sessionId: string; cwd: string; connectedAt: Date; + /** v2 agentic-comms (M1): connection role. CLI uses this to hide + * control-plane daemons from user-facing lists. */ + role: PresenceRole; }> > { const rows = await db @@ -445,6 +460,7 @@ export async function listPeersInMesh( sessionId: presence.sessionId, cwd: presence.cwd, connectedAt: presence.connectedAt, + role: presence.role, }) .from(presence) .innerJoin(memberTable, eq(presence.memberId, memberTable.id)) @@ -469,6 +485,7 @@ export async function listPeersInMesh( sessionId: r.sessionId, cwd: r.cwd, connectedAt: r.connectedAt, + role: (r.role ?? "session") as PresenceRole, })); } @@ -2311,6 +2328,22 @@ function deliverablePriorities(status: PeerStatus): Priority[] { * targetSpec routing: matches either the member's pubkey directly or * the broadcast wildcard ("*"). Channel/tag resolution is per-mesh * config that lives outside this function. + * + * v2 agentic-comms (M1): two-phase claim/deliver with a 30s lease. + * + * The legacy implementation set `delivered_at = NOW()` in the same + * UPDATE that selected the row. If the recipient WS was no longer + * OPEN at push time, the message dropped silently (the row read as + * "delivered" so the next reconnect's drain skipped it). + * + * The new behaviour: + * - claim sets (claimed_at, claim_id, claim_expires_at = NOW() + 30s) + * - delivered_at stays NULL until the recipient acks via `client_ack` + * - re-eligibility predicate accepts rows whose claim has expired, + * so dropped pushes are redelivered (at-least-once) + * + * `claimerPresenceId` is recorded on the row purely for debugging — it + * never gates re-claim; expiry alone does. */ export async function drainForMember( meshId: string, @@ -2320,6 +2353,7 @@ export async function drainForMember( sessionPubkey?: string, excludeSenderSessionPubkey?: string, memberGroups?: string[], + claimerPresenceId?: string, ): Promise< Array<{ id: string; @@ -2385,6 +2419,11 @@ export async function drainForMember( // (with id as tiebreaker so equal-timestamp rows stay deterministic). // Sorting in SQL avoids JS Date's millisecond-precision collapse of // Postgres microsecond timestamps. + // + // v2 (M1): claim sets the lease columns, NOT delivered_at. Re-eligibility + // accepts unclaimed rows AND rows with an expired claim (NULL or past + // NOW()). delivered_at stays NULL until a `client_ack` lands. + const claimerId = claimerPresenceId ?? null; const result = await db.execute<{ id: string; priority: string; @@ -2398,12 +2437,15 @@ export async function drainForMember( }>(sql` WITH claimed AS ( UPDATE mesh.message_queue AS mq - SET delivered_at = NOW() + SET claimed_at = NOW(), + claim_id = ${claimerId}, + claim_expires_at = NOW() + INTERVAL '30 seconds' FROM mesh.member AS m WHERE mq.id IN ( SELECT id FROM mesh.message_queue WHERE mesh_id = ${meshId} AND delivered_at IS NULL + AND (claimed_at IS NULL OR claim_expires_at IS NULL OR claim_expires_at < NOW()) AND priority::text IN (${priorityList}) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``}) ${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``} @@ -2445,11 +2487,93 @@ export async function drainForMember( })); } +/** + * v2 agentic-comms (M1): mark a message_queue row as delivered. + * + * Called when the recipient WS replies with a `client_ack` carrying the + * original `client_message_id`. Lookup is scoped to (mesh_id, member_id) + * so a malicious peer can't ack messages addressed to others. Returns + * the number of rows marked (0 = unknown id, already delivered, or wrong + * recipient). + */ +export async function markDelivered(params: { + meshId: string; + /** memberId of the WS that's claiming to have received this message. */ + recipientMemberId: string; + recipientMemberPubkey: string; + recipientSessionPubkey?: string | null; + clientMessageId?: string | null; + brokerMessageId?: string | null; +}): Promise { + const { + meshId, + recipientMemberPubkey, + recipientSessionPubkey, + clientMessageId, + brokerMessageId, + } = params; + if (!clientMessageId && !brokerMessageId) return 0; + + // Prefer broker id when available; falls back to clientMessageId. + // Scope to (mesh_id, target_spec ∈ {member-pubkey, session-pubkey, '*', @group, #topic}). + // For minimal blast radius we only allow direct/broadcast acks here — + // group/topic acks would need the same membership expansion drainForMember + // does and we'd rather under-ack than over-ack (re-claim is cheap). + const result = await db.execute<{ id: string }>(sql` + UPDATE mesh.message_queue + SET delivered_at = NOW() + WHERE mesh_id = ${meshId} + AND delivered_at IS NULL + AND ( + ${brokerMessageId ? sql`id = ${brokerMessageId}` : sql`FALSE`} + OR ${clientMessageId ? sql`client_message_id = ${clientMessageId}` : sql`FALSE`} + ) + AND ( + target_spec = ${recipientMemberPubkey} + ${recipientSessionPubkey ? sql`OR target_spec = ${recipientSessionPubkey}` : sql``} + OR target_spec = '*' + OR target_spec LIKE '@%' + OR target_spec LIKE '#%' + ) + RETURNING id + `); + const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>; + return rows.length; +} + +/** + * v2 agentic-comms (M1): reap expired claims so dropped pushes redeliver. + * + * Runs every 15s. Clears (claimed_at, claim_id, claim_expires_at) on rows + * where the lease has expired and no `client_ack` arrived. The next + * `drainForMember` call will pick the row up again — at-least-once. + * + * Returns the number of rows reaped. + */ +export async function sweepExpiredClaims(): Promise { + const result = await db.execute<{ id: string }>(sql` + UPDATE mesh.message_queue + SET claimed_at = NULL, + claim_id = NULL, + claim_expires_at = NULL + WHERE delivered_at IS NULL + AND claim_expires_at IS NOT NULL + AND claim_expires_at < NOW() + RETURNING id + `); + const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>; + return rows.length; +} + // --- Lifecycle --- let ttlTimer: ReturnType | null = null; let pendingTimer: ReturnType | null = null; let staleTimer: ReturnType | null = null; +let claimSweepTimer: ReturnType | null = null; + +/** v2 agentic-comms (M1): how often we reap expired message claims. */ +const CLAIM_SWEEP_INTERVAL_MS = 15_000; /** Start background sweepers. Idempotent. */ export function startSweepers(): void { @@ -2467,6 +2591,13 @@ export function startSweepers(): void { console.error("[broker] stale presence sweep:", e), ); }, 30_000); + claimSweepTimer = setInterval(() => { + sweepExpiredClaims() + .then((n) => { + if (n > 0) console.log(`[broker] expired claims swept: ${n}`); + }) + .catch((e) => console.error("[broker] claim sweep:", e)); + }, CLAIM_SWEEP_INTERVAL_MS); // Orphan-message sweep every hour; cheap, rows are all >7d at deletion time. setInterval(() => { sweepOrphanMessages() @@ -2480,9 +2611,11 @@ export async function stopSweepers(): Promise { if (ttlTimer) clearInterval(ttlTimer); if (pendingTimer) clearInterval(pendingTimer); if (staleTimer) clearInterval(staleTimer); + if (claimSweepTimer) clearInterval(claimSweepTimer); ttlTimer = null; pendingTimer = null; staleTimer = null; + claimSweepTimer = null; await db .update(presence) .set({ disconnectedAt: new Date() }) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 731cfe5..8876c04 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -49,6 +49,7 @@ import { listFiles, listPeersInMesh, listState, + markDelivered, listTasks, queueMessage, recallMemory, @@ -546,6 +547,7 @@ async function maybePushQueuedMessages( conn.sessionPubkey ?? undefined, excludeSenderSessionPubkey, conn.groups.map((g) => g.name), + presenceId, ); log.info("maybePush", { presence_id: presenceId, @@ -1772,6 +1774,11 @@ async function handleHello( pid: hello.pid, cwd: hello.cwd, groups: initialGroups, + // v2 agentic-comms (M1): the regular member-keyed `hello` path is + // used by long-lived control-plane connections (claudemesh daemon, + // dashboard, automation). Per-Claude-Code sessions go through + // `session_hello` and get role='session'. + role: "control-plane", }); const effectiveDisplayName = hello.displayName || member.displayName; connections.set(presenceId, { @@ -1796,6 +1803,7 @@ async function handleHello( pubkey: hello.pubkey, groups: initialGroups, restored: !!saved, + role: "control-plane", }); log.info("ws hello", { mesh_id: hello.meshId, @@ -1993,6 +2001,9 @@ async function handleSessionHello( pid: hello.pid, cwd: hello.cwd, groups: initialGroups, + // v2 agentic-comms (M1): per-Claude-Code session WS — these are the + // user-facing peers shown in `claudemesh peer list`. + role: "session", }); const effectiveDisplayName = hello.displayName || member.displayName; connections.set(presenceId, { @@ -2018,6 +2029,7 @@ async function handleSessionHello( session_pubkey: hello.sessionPubkey, groups: initialGroups, via: "session_hello", + role: "session", }); log.info("ws session_hello", { mesh_id: hello.meshId, @@ -2567,6 +2579,39 @@ function handleConnection(ws: WebSocket): void { case "send": await handleSend(conn, msg); break; + case "client_ack": { + // v2 agentic-comms (M1): close out a previously pushed message. + // Lookup is scoped to (mesh_id, recipient pubkey) so a peer can + // only ack messages addressed to itself. + const ack = msg as Extract; + if (!ack.clientMessageId && !ack.brokerMessageId) { + // Nothing to do; don't error — the daemon may speculatively + // ack and we'd rather be lenient than break a CLI release. + break; + } + try { + const n = await markDelivered({ + meshId: conn.meshId, + recipientMemberId: conn.memberId, + recipientMemberPubkey: conn.memberPubkey, + recipientSessionPubkey: conn.sessionPubkey ?? null, + clientMessageId: ack.clientMessageId ?? null, + brokerMessageId: ack.brokerMessageId ?? null, + }); + log.debug("ws client_ack", { + presence_id: presenceId, + client_message_id: ack.clientMessageId, + broker_message_id: ack.brokerMessageId, + marked: n, + }); + } catch (e) { + log.warn("ws client_ack failed", { + presence_id: presenceId, + error: e instanceof Error ? e.message : String(e), + }); + } + break; + } case "set_status": await writeStatus(presenceId, msg.status, "manual", new Date()); log.info("ws set_status", { @@ -2604,6 +2649,10 @@ function handleConnection(ws: WebSocket): void { sessionId: p.sessionId, connectedAt: p.connectedAt.toISOString(), cwd: pc?.cwd ?? p.cwd, + // v2 agentic-comms (M1): typed connection role. CLI uses + // this to hide control-plane daemons from user-facing + // peer lists (filter swap from peerType happens CLI-side). + role: p.role, ...(pc?.hostname ? { hostname: pc.hostname } : {}), ...(pc?.peerType ? { peerType: pc.peerType } : {}), ...(pc?.channel ? { channel: pc.channel } : {}), diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index d12cded..0d7435e 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -224,6 +224,26 @@ export interface WSSetStatusMessage { status: PeerStatus; } +/** + * Client → broker: confirm receipt of a previously pushed envelope so the + * broker can mark the message_queue row delivered. + * + * v2 agentic-comms (M1): pairs with the two-phase claim/lease introduced + * in `drainForMember`. Without this ack, the lease expires after 30s and + * the message is re-claimed and re-pushed (at-least-once retry). + * + * Either id is accepted; daemons that track inbox dedupe by clientMessageId + * should send that one. brokerMessageId is the row primary key, useful when + * the original send didn't carry a client_message_id (legacy traffic). + */ +export interface WSClientAckMessage { + type: "client_ack"; + /** Original caller-supplied idempotency id from the `send` envelope. */ + clientMessageId?: string; + /** Broker-side row id (the `messageId` field on the inbound `push`). */ + brokerMessageId?: string; +} + /** Client → broker: request list of connected peers in the same mesh. */ export interface WSListPeersMessage { type: "list_peers"; @@ -518,6 +538,8 @@ export interface WSPeersListMessage { type: "peers_list"; peers: Array<{ pubkey: string; + /** Stable member pubkey — present on M1+ broker responses. */ + memberPubkey?: string; displayName: string; status: PeerStatus; summary: string | null; @@ -525,6 +547,10 @@ export interface WSPeersListMessage { sessionId: string; connectedAt: string; cwd?: string; + /** v2 agentic-comms (M1): typed connection role. CLI uses this to + * filter control-plane daemons out of user-facing peer lists. + * Optional for clients talking to a pre-M1 broker. */ + role?: "control-plane" | "session" | "service"; hostname?: string; peerType?: "ai" | "human" | "connector"; channel?: string; @@ -1417,6 +1443,7 @@ export type WSClientMessage = | WSHelloMessage | WSSessionHelloMessage | WSSendMessage + | WSClientAckMessage | WSSetStatusMessage | WSListPeersMessage | WSSetSummaryMessage