diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 7888bff..165e11b 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -369,8 +369,19 @@ export interface ConnectParams { pid: number; cwd: string; groups?: Array<{ name: string; role?: string }>; + /** + * v2 agentic-comms (M1) — connection role. + * 'control-plane' — daemon WS (hidden from user-facing peer lists). + * 'session' — per-Claude-Code-session WS (default). + * 'service' — autonomous bots/services attached to the mesh. + * Optional for backwards compatibility; defaults to 'session'. + */ + role?: PresenceRole; } +/** v2 agentic-comms (M1): typed connection roles. */ +export type PresenceRole = "control-plane" | "session" | "service"; + /** Create a presence row for a new WS connection. */ export async function connectPresence( params: ConnectParams, @@ -389,6 +400,7 @@ export async function connectPresence( statusSource: "jsonl", statusUpdatedAt: now, groups: params.groups ?? [], + role: params.role ?? "session", connectedAt: now, lastPingAt: now, }) @@ -431,6 +443,9 @@ export async function listPeersInMesh( sessionId: string; cwd: string; connectedAt: Date; + /** v2 agentic-comms (M1): connection role. CLI uses this to hide + * control-plane daemons from user-facing lists. */ + role: PresenceRole; }> > { const rows = await db @@ -445,6 +460,7 @@ export async function listPeersInMesh( sessionId: presence.sessionId, cwd: presence.cwd, connectedAt: presence.connectedAt, + role: presence.role, }) .from(presence) .innerJoin(memberTable, eq(presence.memberId, memberTable.id)) @@ -469,6 +485,7 @@ export async function listPeersInMesh( sessionId: r.sessionId, cwd: r.cwd, connectedAt: r.connectedAt, + role: (r.role ?? "session") as PresenceRole, })); } @@ -2311,6 +2328,22 @@ function deliverablePriorities(status: PeerStatus): Priority[] { * targetSpec routing: matches either the member's pubkey directly or * the broadcast wildcard ("*"). Channel/tag resolution is per-mesh * config that lives outside this function. + * + * v2 agentic-comms (M1): two-phase claim/deliver with a 30s lease. + * + * The legacy implementation set `delivered_at = NOW()` in the same + * UPDATE that selected the row. If the recipient WS was no longer + * OPEN at push time, the message dropped silently (the row read as + * "delivered" so the next reconnect's drain skipped it). + * + * The new behaviour: + * - claim sets (claimed_at, claim_id, claim_expires_at = NOW() + 30s) + * - delivered_at stays NULL until the recipient acks via `client_ack` + * - re-eligibility predicate accepts rows whose claim has expired, + * so dropped pushes are redelivered (at-least-once) + * + * `claimerPresenceId` is recorded on the row purely for debugging — it + * never gates re-claim; expiry alone does. */ export async function drainForMember( meshId: string, @@ -2320,6 +2353,7 @@ export async function drainForMember( sessionPubkey?: string, excludeSenderSessionPubkey?: string, memberGroups?: string[], + claimerPresenceId?: string, ): Promise< Array<{ id: string; @@ -2385,6 +2419,11 @@ export async function drainForMember( // (with id as tiebreaker so equal-timestamp rows stay deterministic). // Sorting in SQL avoids JS Date's millisecond-precision collapse of // Postgres microsecond timestamps. + // + // v2 (M1): claim sets the lease columns, NOT delivered_at. Re-eligibility + // accepts unclaimed rows AND rows with an expired claim (NULL or past + // NOW()). delivered_at stays NULL until a `client_ack` lands. + const claimerId = claimerPresenceId ?? null; const result = await db.execute<{ id: string; priority: string; @@ -2398,12 +2437,15 @@ export async function drainForMember( }>(sql` WITH claimed AS ( UPDATE mesh.message_queue AS mq - SET delivered_at = NOW() + SET claimed_at = NOW(), + claim_id = ${claimerId}, + claim_expires_at = NOW() + INTERVAL '30 seconds' FROM mesh.member AS m WHERE mq.id IN ( SELECT id FROM mesh.message_queue WHERE mesh_id = ${meshId} AND delivered_at IS NULL + AND (claimed_at IS NULL OR claim_expires_at IS NULL OR claim_expires_at < NOW()) AND priority::text IN (${priorityList}) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``}) ${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``} @@ -2445,11 +2487,93 @@ export async function drainForMember( })); } +/** + * v2 agentic-comms (M1): mark a message_queue row as delivered. + * + * Called when the recipient WS replies with a `client_ack` carrying the + * original `client_message_id`. Lookup is scoped to (mesh_id, member_id) + * so a malicious peer can't ack messages addressed to others. Returns + * the number of rows marked (0 = unknown id, already delivered, or wrong + * recipient). + */ +export async function markDelivered(params: { + meshId: string; + /** memberId of the WS that's claiming to have received this message. */ + recipientMemberId: string; + recipientMemberPubkey: string; + recipientSessionPubkey?: string | null; + clientMessageId?: string | null; + brokerMessageId?: string | null; +}): Promise { + const { + meshId, + recipientMemberPubkey, + recipientSessionPubkey, + clientMessageId, + brokerMessageId, + } = params; + if (!clientMessageId && !brokerMessageId) return 0; + + // Prefer broker id when available; falls back to clientMessageId. + // Scope to (mesh_id, target_spec ∈ {member-pubkey, session-pubkey, '*', @group, #topic}). + // For minimal blast radius we only allow direct/broadcast acks here — + // group/topic acks would need the same membership expansion drainForMember + // does and we'd rather under-ack than over-ack (re-claim is cheap). + const result = await db.execute<{ id: string }>(sql` + UPDATE mesh.message_queue + SET delivered_at = NOW() + WHERE mesh_id = ${meshId} + AND delivered_at IS NULL + AND ( + ${brokerMessageId ? sql`id = ${brokerMessageId}` : sql`FALSE`} + OR ${clientMessageId ? sql`client_message_id = ${clientMessageId}` : sql`FALSE`} + ) + AND ( + target_spec = ${recipientMemberPubkey} + ${recipientSessionPubkey ? sql`OR target_spec = ${recipientSessionPubkey}` : sql``} + OR target_spec = '*' + OR target_spec LIKE '@%' + OR target_spec LIKE '#%' + ) + RETURNING id + `); + const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>; + return rows.length; +} + +/** + * v2 agentic-comms (M1): reap expired claims so dropped pushes redeliver. + * + * Runs every 15s. Clears (claimed_at, claim_id, claim_expires_at) on rows + * where the lease has expired and no `client_ack` arrived. The next + * `drainForMember` call will pick the row up again — at-least-once. + * + * Returns the number of rows reaped. + */ +export async function sweepExpiredClaims(): Promise { + const result = await db.execute<{ id: string }>(sql` + UPDATE mesh.message_queue + SET claimed_at = NULL, + claim_id = NULL, + claim_expires_at = NULL + WHERE delivered_at IS NULL + AND claim_expires_at IS NOT NULL + AND claim_expires_at < NOW() + RETURNING id + `); + const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{ id: string }>; + return rows.length; +} + // --- Lifecycle --- let ttlTimer: ReturnType | null = null; let pendingTimer: ReturnType | null = null; let staleTimer: ReturnType | null = null; +let claimSweepTimer: ReturnType | null = null; + +/** v2 agentic-comms (M1): how often we reap expired message claims. */ +const CLAIM_SWEEP_INTERVAL_MS = 15_000; /** Start background sweepers. Idempotent. */ export function startSweepers(): void { @@ -2467,6 +2591,13 @@ export function startSweepers(): void { console.error("[broker] stale presence sweep:", e), ); }, 30_000); + claimSweepTimer = setInterval(() => { + sweepExpiredClaims() + .then((n) => { + if (n > 0) console.log(`[broker] expired claims swept: ${n}`); + }) + .catch((e) => console.error("[broker] claim sweep:", e)); + }, CLAIM_SWEEP_INTERVAL_MS); // Orphan-message sweep every hour; cheap, rows are all >7d at deletion time. setInterval(() => { sweepOrphanMessages() @@ -2480,9 +2611,11 @@ export async function stopSweepers(): Promise { if (ttlTimer) clearInterval(ttlTimer); if (pendingTimer) clearInterval(pendingTimer); if (staleTimer) clearInterval(staleTimer); + if (claimSweepTimer) clearInterval(claimSweepTimer); ttlTimer = null; pendingTimer = null; staleTimer = null; + claimSweepTimer = null; await db .update(presence) .set({ disconnectedAt: new Date() }) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 731cfe5..8876c04 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -49,6 +49,7 @@ import { listFiles, listPeersInMesh, listState, + markDelivered, listTasks, queueMessage, recallMemory, @@ -546,6 +547,7 @@ async function maybePushQueuedMessages( conn.sessionPubkey ?? undefined, excludeSenderSessionPubkey, conn.groups.map((g) => g.name), + presenceId, ); log.info("maybePush", { presence_id: presenceId, @@ -1772,6 +1774,11 @@ async function handleHello( pid: hello.pid, cwd: hello.cwd, groups: initialGroups, + // v2 agentic-comms (M1): the regular member-keyed `hello` path is + // used by long-lived control-plane connections (claudemesh daemon, + // dashboard, automation). Per-Claude-Code sessions go through + // `session_hello` and get role='session'. + role: "control-plane", }); const effectiveDisplayName = hello.displayName || member.displayName; connections.set(presenceId, { @@ -1796,6 +1803,7 @@ async function handleHello( pubkey: hello.pubkey, groups: initialGroups, restored: !!saved, + role: "control-plane", }); log.info("ws hello", { mesh_id: hello.meshId, @@ -1993,6 +2001,9 @@ async function handleSessionHello( pid: hello.pid, cwd: hello.cwd, groups: initialGroups, + // v2 agentic-comms (M1): per-Claude-Code session WS — these are the + // user-facing peers shown in `claudemesh peer list`. + role: "session", }); const effectiveDisplayName = hello.displayName || member.displayName; connections.set(presenceId, { @@ -2018,6 +2029,7 @@ async function handleSessionHello( session_pubkey: hello.sessionPubkey, groups: initialGroups, via: "session_hello", + role: "session", }); log.info("ws session_hello", { mesh_id: hello.meshId, @@ -2567,6 +2579,39 @@ function handleConnection(ws: WebSocket): void { case "send": await handleSend(conn, msg); break; + case "client_ack": { + // v2 agentic-comms (M1): close out a previously pushed message. + // Lookup is scoped to (mesh_id, recipient pubkey) so a peer can + // only ack messages addressed to itself. + const ack = msg as Extract; + if (!ack.clientMessageId && !ack.brokerMessageId) { + // Nothing to do; don't error — the daemon may speculatively + // ack and we'd rather be lenient than break a CLI release. + break; + } + try { + const n = await markDelivered({ + meshId: conn.meshId, + recipientMemberId: conn.memberId, + recipientMemberPubkey: conn.memberPubkey, + recipientSessionPubkey: conn.sessionPubkey ?? null, + clientMessageId: ack.clientMessageId ?? null, + brokerMessageId: ack.brokerMessageId ?? null, + }); + log.debug("ws client_ack", { + presence_id: presenceId, + client_message_id: ack.clientMessageId, + broker_message_id: ack.brokerMessageId, + marked: n, + }); + } catch (e) { + log.warn("ws client_ack failed", { + presence_id: presenceId, + error: e instanceof Error ? e.message : String(e), + }); + } + break; + } case "set_status": await writeStatus(presenceId, msg.status, "manual", new Date()); log.info("ws set_status", { @@ -2604,6 +2649,10 @@ function handleConnection(ws: WebSocket): void { sessionId: p.sessionId, connectedAt: p.connectedAt.toISOString(), cwd: pc?.cwd ?? p.cwd, + // v2 agentic-comms (M1): typed connection role. CLI uses + // this to hide control-plane daemons from user-facing + // peer lists (filter swap from peerType happens CLI-side). + role: p.role, ...(pc?.hostname ? { hostname: pc.hostname } : {}), ...(pc?.peerType ? { peerType: pc.peerType } : {}), ...(pc?.channel ? { channel: pc.channel } : {}), diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index d12cded..0d7435e 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -224,6 +224,26 @@ export interface WSSetStatusMessage { status: PeerStatus; } +/** + * Client → broker: confirm receipt of a previously pushed envelope so the + * broker can mark the message_queue row delivered. + * + * v2 agentic-comms (M1): pairs with the two-phase claim/lease introduced + * in `drainForMember`. Without this ack, the lease expires after 30s and + * the message is re-claimed and re-pushed (at-least-once retry). + * + * Either id is accepted; daemons that track inbox dedupe by clientMessageId + * should send that one. brokerMessageId is the row primary key, useful when + * the original send didn't carry a client_message_id (legacy traffic). + */ +export interface WSClientAckMessage { + type: "client_ack"; + /** Original caller-supplied idempotency id from the `send` envelope. */ + clientMessageId?: string; + /** Broker-side row id (the `messageId` field on the inbound `push`). */ + brokerMessageId?: string; +} + /** Client → broker: request list of connected peers in the same mesh. */ export interface WSListPeersMessage { type: "list_peers"; @@ -518,6 +538,8 @@ export interface WSPeersListMessage { type: "peers_list"; peers: Array<{ pubkey: string; + /** Stable member pubkey — present on M1+ broker responses. */ + memberPubkey?: string; displayName: string; status: PeerStatus; summary: string | null; @@ -525,6 +547,10 @@ export interface WSPeersListMessage { sessionId: string; connectedAt: string; cwd?: string; + /** v2 agentic-comms (M1): typed connection role. CLI uses this to + * filter control-plane daemons out of user-facing peer lists. + * Optional for clients talking to a pre-M1 broker. */ + role?: "control-plane" | "session" | "service"; hostname?: string; peerType?: "ai" | "human" | "connector"; channel?: string; @@ -1417,6 +1443,7 @@ export type WSClientMessage = | WSHelloMessage | WSSessionHelloMessage | WSSendMessage + | WSClientAckMessage | WSSetStatusMessage | WSListPeersMessage | WSSetSummaryMessage