diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 165e11b..e7fcc8e 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -444,8 +444,10 @@ export async function listPeersInMesh( cwd: string; connectedAt: Date; /** v2 agentic-comms (M1): connection role. CLI uses this to hide - * control-plane daemons from user-facing lists. */ - role: PresenceRole; + * control-plane daemons from user-facing lists. Wire-level field + * is `peerRole` to avoid collision with 1.31.5's top-level `role` + * lift of profile.role (user-supplied string like "lead"). */ + peerRole: PresenceRole; }> > { const rows = await db @@ -460,7 +462,7 @@ export async function listPeersInMesh( sessionId: presence.sessionId, cwd: presence.cwd, connectedAt: presence.connectedAt, - role: presence.role, + peerRole: presence.role, }) .from(presence) .innerJoin(memberTable, eq(presence.memberId, memberTable.id)) @@ -485,7 +487,7 @@ export async function listPeersInMesh( sessionId: r.sessionId, cwd: r.cwd, connectedAt: r.connectedAt, - role: (r.role ?? "session") as PresenceRole, + peerRole: (r.peerRole ?? "session") as PresenceRole, })); } diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 8876c04..85dc6cb 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -2652,7 +2652,9 @@ function handleConnection(ws: WebSocket): void { // v2 agentic-comms (M1): typed connection role. CLI uses // this to hide control-plane daemons from user-facing // peer lists (filter swap from peerType happens CLI-side). - role: p.role, + // Wire field is `peerRole` to avoid collision with the + // 1.31.5 top-level `role` lift of profile.role. + peerRole: p.peerRole, ...(pc?.hostname ? { hostname: pc.hostname } : {}), ...(pc?.peerType ? { peerType: pc.peerType } : {}), ...(pc?.channel ? { channel: pc.channel } : {}), diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 0d7435e..d80b4cb 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -549,8 +549,11 @@ export interface WSPeersListMessage { cwd?: string; /** v2 agentic-comms (M1): typed connection role. CLI uses this to * filter control-plane daemons out of user-facing peer lists. - * Optional for clients talking to a pre-M1 broker. */ - role?: "control-plane" | "session" | "service"; + * Optional for clients talking to a pre-M1 broker. Wire field is + * `peerRole` to avoid collision with 1.31.5's top-level `role` + * (which is a lift of `profile.role`, the user-supplied string + * like "lead" / "reviewer" / "human"). */ + peerRole?: "control-plane" | "session" | "service"; hostname?: string; peerType?: "ai" | "human" | "connector"; channel?: string; diff --git a/apps/cli/CHANGELOG.md b/apps/cli/CHANGELOG.md index 1e23ff6..9e11091 100644 --- a/apps/cli/CHANGELOG.md +++ b/apps/cli/CHANGELOG.md @@ -1,6 +1,86 @@ # Changelog -## Unreleased — Milestone 1 lifecycle cleanups +## 1.33.0 (2026-05-04) — Milestone 1: lifecycle cleanups + at-least-once with ack + +First milestone of the agentic-comms architecture work +(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). +Foundational correctness — no new external surface, but the wire +protocol grows two additions: a `peerRole` field on `peer list` +responses (presence classification) and a new client-→broker +`client_ack` frame. + +### Lifecycle helper extraction + +`DaemonBrokerClient` and `SessionBrokerClient` now share a single +lifecycle implementation in `apps/cli/src/daemon/ws-lifecycle.ts` +(`connectWsWithBackoff`). Each client supplies `buildHello` / +`isHelloAck` / `onMessage` and keeps its own RPC bookkeeping; the +helper handles connect, hello-ack timeout, close + backoff reconnect. +Composition over inheritance per Codex's review. Eliminates the drift +bug class that produced 1.32.0/1.32.1 (lifecycle copies diverging +silently when one side gained a feature). + +### Daemon-WS no longer carries an ephemeral session keypair + +Pre-1.33: every daemon-WS reconnect minted a fresh keypair, sent the +pubkey in the hello, and held the secret in memory for "session" +crypto. Vestigial since 1.30.0 introduced the per-launch +`SessionBrokerClient` that owns the real session pubkey. Daemon-WS +now uses the stable mesh member secret directly for outbound +encryption. Inbound on daemon-WS only attempts member-key decryption — +session decryption is the session-WS's job. + +### `peerRole` wire field + +The broker now emits a `peerRole` field on each `peer list` row — +`'control-plane' | 'session' | 'service'`. `control-plane` rows are +the daemon's own member-keyed presence (infrastructure), `session` +rows are launched Claude Code sessions, `service` rows are reserved +for v2.x service identities (HTTP webhook consumers, voice agents, +etc.). + +The CLI hides `peerRole === 'control-plane'` rows from the human +renderer by default and exposes a `--all` flag for debugging. JSON +output emits `peerRole` on every row. + +**Why `peerRole` and not just `role`:** 1.31.5 already lifted +`profile.role` (user-supplied string like "lead", "reviewer") to +top-level `role`, and the agent-vibes claudemesh skill consumes that +field. The presence classification is a different axis, so it gets +its own field name. `role` keeps its 1.31.5 semantics; `peerRole` is +the new field. + +### `client_ack` and at-least-once delivery + +The broker (M1 broker change) now uses two-phase claim/deliver: +`claimed_at` / `claim_id` / `claim_expires_at` columns track lease +ownership; `delivered_at` is set ONLY when the recipient acks. A 15s +sweeper re-claims rows whose 30s lease expired without ack. + +The CLI side closes the loop: after `handleBrokerPush` lands a +message in `inbox.db` (or dedupes against an existing row), the +recipient daemon emits a `client_ack { type: "client_ack", +clientMessageId, brokerMessageId? }` frame on whichever WS the push +arrived on. Best-effort — if the WS is closed by ack time, the +broker's lease will naturally re-deliver, and the receiver dedupes +on `clientMessageId`. + +Net behavior: at-least-once with idempotent dedupe. Net visible +change: zero, in the steady state. Crash-mid-push test (kill recipient +between broker claim and recipient ack) now redelivers instead of +silently dropping. + +### Files + +- New: `apps/cli/src/daemon/ws-lifecycle.ts` (234 lines). +- Refactored: `apps/cli/src/daemon/broker.ts`, `session-broker.ts`, + `inbound.ts`, `run.ts`, `commands/peers.ts`, `ipc/server.ts`. +- Broker side (separate commit): drain race fix, `presence.role` + column, `client_ack` handler, lease sweeper. +- DB migration `0029_drain_lease_and_presence_role.sql` ships with + the broker change. + + Foundational refactor before the agentic-comms architecture work (`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). Three diff --git a/apps/cli/package.json b/apps/cli/package.json index 387c92f..0380c24 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "1.32.1", + "version": "1.33.0", "description": "Peer mesh for Claude Code sessions — CLI + MCP server.", "keywords": [ "claude-code", diff --git a/apps/cli/src/commands/peers.ts b/apps/cli/src/commands/peers.ts index e15d741..94cb94a 100644 --- a/apps/cli/src/commands/peers.ts +++ b/apps/cli/src/commands/peers.ts @@ -57,17 +57,20 @@ interface PeerRecord { status?: string; summary?: string; groups: Array<{ name: string; role?: string }>; - /** Broker-emitted classification: 'control-plane' | 'session' | - * 'service'. Source of truth for the --all visibility filter and the - * default-hide rule. Older brokers omit this; the CLI fills missing - * values with 'session' so legacy peer rows stay visible. + /** Top-level convenience alias for `profile.role`, lifted by the CLI + * since 1.31.5 so JSON consumers (the agent-vibes claudemesh skill, + * launched-session LLMs) see the user-supplied role string at the + * shape's top level. Same value as `profile.role`. Distinct from + * `peerRole` below — that's the broker's presence-class taxonomy. */ + role?: string; + /** Broker-emitted presence classification: 'control-plane' | 'session' + * | 'service'. Source of truth for the --all visibility filter and + * the default-hide rule. Older brokers omit this; the CLI fills + * missing values with 'session' so legacy peer rows stay visible. * - * Note: this replaces the prior CLI-side lift of `profile.role` to - * the top-level `role` field — `profile.role` is user-supplied - * metadata (e.g. "lead", "reviewer"), distinct from the broker's - * presence-class taxonomy. The user-facing string still lives at - * `profile.role` and is rendered inline as `role:`. */ - role?: PeerRole; + * Renamed from `role` to avoid collision with 1.31.5's profile.role + * lift above. Wire-level field on the broker is also `peerRole`. */ + peerRole?: PeerRole; peerType?: string; channel?: string; model?: string; @@ -160,9 +163,14 @@ async function listPeersForMesh(slug: string): Promise { * surfaced a sender's siblings as separate rows because they're separate * presence rows; the cli just hadn't been making that visible. * - * Also normalizes the broker's `role` classification: missing values - * (older brokers) default to 'session' so legacy peer rows stay + * Also normalizes the broker's `peerRole` classification: missing + * values (older brokers) default to 'session' so legacy peer rows stay * visible under the default `--all=false` filter. + * + * And lifts `profile.role` to a top-level `role` field — the 1.31.5 + * convenience alias for JSON consumers (skill SKILL.md, launched-session + * LLMs, jq pipelines). Same value as profile.role; distinct from + * peerRole (presence taxonomy). */ function annotateSelf( peer: PeerRecord, @@ -179,8 +187,15 @@ function annotateSelf( selfSessionPubkey && peer.pubkey === selfSessionPubkey ); - const role: PeerRole = peer.role ?? "session"; - return { ...peer, role, isSelf, isThisSession }; + const peerRole: PeerRole = peer.peerRole ?? "session"; + const profileRole = peer.profile?.role?.trim() || undefined; + return { + ...peer, + ...(profileRole ? { role: profileRole } : {}), + peerRole, + isSelf, + isThisSession, + }; } export async function runPeers(flags: PeersFlags): Promise { @@ -231,13 +246,13 @@ export async function runPeers(flags: PeersFlags): Promise { // they confused users into thinking the daemon counted as a // separate peer. --all opts back in for debugging. // - // Source of truth: broker-emitted `role` field (added 2026-05-04). - // annotateSelf() already filled in 'session' for older brokers - // that don't emit role yet, so this filter is backwards-compatible - // by construction — legacy rows show up. + // Source of truth: broker-emitted `peerRole` field (added + // 2026-05-04). annotateSelf() filled in 'session' for older + // brokers that don't emit peerRole yet, so this filter is + // backwards-compatible by construction — legacy rows show up. const visible = flags.all ? peers - : peers.filter((p) => p.role !== "control-plane"); + : peers.filter((p) => p.peerRole !== "control-plane"); // Sort: this-session first, then your-other-sessions, then real // peers. Within each group, idle/working ahead of dnd. Inside the diff --git a/apps/cli/src/daemon/broker.ts b/apps/cli/src/daemon/broker.ts index 286dcdd..b968741 100644 --- a/apps/cli/src/daemon/broker.ts +++ b/apps/cli/src/daemon/broker.ts @@ -281,6 +281,22 @@ export class DaemonBrokerClient { return !!sock && sock.readyState === sock.OPEN; } + /** v2 agentic-comms (M1): send `client_ack` back to the broker after + * successfully landing an inbound push in inbox.db. Broker uses the + * ack to set `delivered_at` (atomic at-least-once). Best-effort — + * if the WS isn't open, drop the ack; broker's 30s lease will + * re-deliver. */ + sendClientAck(clientMessageId: string, brokerMessageId: string | null): void { + if (!this.isOpen()) return; + try { + this.lifecycle!.send({ + type: "client_ack", + clientMessageId, + ...(brokerMessageId ? { brokerMessageId } : {}), + }); + } catch { /* drop; lease re-delivers */ } + } + /** Send one outbox row. Resolves on broker ack/timeout. */ send(req: BrokerSendArgs): Promise { return new Promise((resolve) => { diff --git a/apps/cli/src/daemon/inbound.ts b/apps/cli/src/daemon/inbound.ts index 86fa54a..6a1a0eb 100644 --- a/apps/cli/src/daemon/inbound.ts +++ b/apps/cli/src/daemon/inbound.ts @@ -18,6 +18,13 @@ export interface InboundContext { /** Daemon's session secret key hex (rotates per connect). When the * sender encrypted to our session pubkey, decrypt with this instead. */ sessionSecretKeyHex?: string; + /** v2 agentic-comms (M1): emit `client_ack` back to the broker after + * the message lands in inbox.db. Broker uses the ack to set + * `delivered_at` (atomic at-least-once). Without it, the broker's + * 30s lease expires and re-delivers — correct but noisy. The WS + * client owns this callback because it's the one that owns the + * socket; inbound.ts just signals "I accepted this id." */ + ackClientMessage?: (clientMessageId: string, brokerMessageId: string | null) => void; log?: (level: "info" | "warn" | "error", msg: string, meta?: Record) => void; } @@ -73,6 +80,12 @@ export async function handleBrokerPush(msg: Record, ctx: Inboun reply_to_id: replyToId, }); + // Whether the row was newly inserted or already existed (dedupe), the + // broker still wants to know we received and processed this message — + // ack regardless. Skipping ack on dedupe would leak: broker would + // re-deliver after lease, and the receiver would re-dedupe forever. + ctx.ackClientMessage?.(clientMessageId, brokerMessageId); + if (!inserted) return; // already had this id; no event ctx.bus.publish("message", { diff --git a/apps/cli/src/daemon/run.ts b/apps/cli/src/daemon/run.ts index 4ed70ad..10e3be0 100644 --- a/apps/cli/src/daemon/run.ts +++ b/apps/cli/src/daemon/run.ts @@ -127,7 +127,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { const meshConfigs = new Map(); for (const mesh of meshes) { meshConfigs.set(mesh.slug, mesh); - const broker = new DaemonBrokerClient(mesh, { + const broker: DaemonBrokerClient = new DaemonBrokerClient(mesh, { displayName: opts.displayName, onStatusChange: (s) => { process.stdout.write(JSON.stringify({ @@ -146,6 +146,9 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { bus, meshSlug: mesh.slug, recipientSecretKeyHex: mesh.secretKey, + // v2 agentic-comms (M1): client_ack closes the at-least-once + // loop. Broker holds the row claimed (not delivered) until ack. + ackClientMessage: (cmid, bmid) => broker.sendClientAck(cmid, bmid), }); }, }); @@ -187,7 +190,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { // session secret key; member key remains the fallback for legacy // member-targeted traffic that happens to fan out here. const sessionSecretKeyHex = info.presence.sessionSecretKey; - const client = new SessionBrokerClient({ + const client: SessionBrokerClient = new SessionBrokerClient({ mesh: meshConfig, sessionPubkey: info.presence.sessionPubkey, sessionSecretKey: info.presence.sessionSecretKey, @@ -204,6 +207,8 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { meshSlug: meshConfig.slug, recipientSecretKeyHex: meshConfig.secretKey, sessionSecretKeyHex, + // v2 agentic-comms (M1): close the at-least-once loop. + ackClientMessage: (cmid, bmid) => client.sendClientAck(cmid, bmid), }); }, }); diff --git a/apps/cli/src/daemon/session-broker.ts b/apps/cli/src/daemon/session-broker.ts index 217849a..0357d2e 100644 --- a/apps/cli/src/daemon/session-broker.ts +++ b/apps/cli/src/daemon/session-broker.ts @@ -167,6 +167,20 @@ export class SessionBrokerClient { }); } + /** v2 agentic-comms (M1): send `client_ack` back to the broker after + * successfully landing an inbound push in inbox.db. Broker uses the + * ack to set `delivered_at`. Best-effort. */ + sendClientAck(clientMessageId: string, brokerMessageId: string | null): void { + if (this._status !== "open" || !this.lifecycle) return; + try { + this.lifecycle.send({ + type: "client_ack", + clientMessageId, + ...(brokerMessageId ? { brokerMessageId } : {}), + }); + } catch { /* drop; lease re-delivers */ } + } + async close(): Promise { this.closed = true; if (this.lifecycle) {