From dab80f475e77e6f8a8130b336447a2b7b617f1bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Mon, 4 May 2026 18:08:32 +0100 Subject: [PATCH] refactor(cli): m1 lifecycle + role-aware peer list MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundational cleanups before agentic-comms architecture work (.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md). All behavior-preserving. 1. Extract `connectWsWithBackoff` into apps/cli/src/daemon/ws-lifecycle.ts. Both DaemonBrokerClient and SessionBrokerClient now share one lifecycle implementation (connect, hello-handshake, ack-timeout, close + backoff reconnect). Each client provides its own buildHello / isHelloAck / onMessage hooks and keeps its own RPC bookkeeping (pendingAcks, peerListResolvers, onPush). Composition over inheritance per Codex's review; no protocol shape changes. 2. Drop daemon-WS ephemeral session pubkey. DaemonBrokerClient no longer mints + sends a per-reconnect ephemeral keypair in its hello. Session-targeted DMs land on SessionBrokerClient since 1.32.1, not the member-keyed daemon-WS, so the field was vestigial. Send-encrypt path now signs DMs with the stable mesh member secret. handleBrokerPush invocations from daemon-WS only pass the member secret — session decryption is the session-WS's job. 3. Role-aware peer list. `peer list` now hides peers whose broker-emitted `role` is `'control-plane'`. `--all` opts back in. JSON output emits `role` at top level. Older brokers that don't emit role yet default to 'session', so legacy peer rows stay visible without the broker-side change shipped first. Replaces the prior `peerType === 'claudemesh-daemon'` channel-name hack. Typecheck + tests + build all green. --- apps/cli/CHANGELOG.md | 30 ++ apps/cli/src/commands/peers.ts | 71 +++-- apps/cli/src/daemon/broker.ts | 415 ++++++++++++-------------- apps/cli/src/daemon/ipc/server.ts | 19 +- apps/cli/src/daemon/run.ts | 7 +- apps/cli/src/daemon/session-broker.ts | 153 ++++------ apps/cli/src/daemon/ws-lifecycle.ts | 234 +++++++++++++++ 7 files changed, 575 insertions(+), 354 deletions(-) create mode 100644 apps/cli/src/daemon/ws-lifecycle.ts diff --git a/apps/cli/CHANGELOG.md b/apps/cli/CHANGELOG.md index 8f2e70b..1e23ff6 100644 --- a/apps/cli/CHANGELOG.md +++ b/apps/cli/CHANGELOG.md @@ -1,5 +1,35 @@ # Changelog +## Unreleased — Milestone 1 lifecycle cleanups + +Foundational refactor before the agentic-comms architecture work +(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). Three +changes, all behavior-preserving: + +- **`connectWsWithBackoff` helper** (`apps/cli/src/daemon/ws-lifecycle.ts`). + Both `DaemonBrokerClient` and `SessionBrokerClient` now share one + lifecycle implementation — connect, hello-handshake, ack-timeout, + close + backoff reconnect. Each client supplies `buildHello` / + `isHelloAck` / `onMessage` and keeps its own RPC bookkeeping + (pendingAcks, peerListResolvers, onPush, etc). Composition over + inheritance per Codex's review; no protocol shape changes. + +- **Drop daemon-WS ephemeral session pubkey.** `DaemonBrokerClient` no + longer mints + sends a per-reconnect ephemeral keypair in its hello. + Session-targeted DMs land on `SessionBrokerClient` (since 1.32.1), + not the member-keyed daemon-WS, so the field was vestigial. The + daemon's send-encrypt path now signs DMs with the stable mesh member + secret. Inbound on daemon-WS only attempts member-key decryption — + session decryption is the session-WS's job. + +- **Role-aware peer list.** `peer list` now hides peers whose + broker-emitted `role` is `'control-plane'` (the daemon's own + member-keyed presence). `--all` opts back in. JSON output emits + `role` at the top level. Older brokers that don't emit `role` yet + default to `'session'`, so legacy peer rows stay visible without + the broker-side change shipped first. Replaces the prior + `peerType === 'claudemesh-daemon'` channel-name hack. + ## 1.32.1 (2026-05-04) — DMs to session pubkeys actually deliver now Critical fix. Sessions launched via `claudemesh launch` (1.30.0+) hold a diff --git a/apps/cli/src/commands/peers.ts b/apps/cli/src/commands/peers.ts index f1a7428..e15d741 100644 --- a/apps/cli/src/commands/peers.ts +++ b/apps/cli/src/commands/peers.ts @@ -21,14 +21,28 @@ export interface PeersFlags { mesh?: string; /** `true`/`undefined` = full record; comma-separated string = field projection. */ json?: boolean | string; - /** When false (default), hide claudemesh-daemon presence rows from the - * human renderer — they're infrastructure, not interactive peers, and - * confused users into thinking the daemon counted as a "peer". The - * JSON output still includes them so scripts that need a full inventory - * can opt in via --all (or just consume JSON). */ + /** When false (default), hide control-plane presence rows from the + * human renderer — they're infrastructure (daemon-WS member-keyed + * presence), not interactive peers, and confused users into thinking + * the daemon counted as a "peer". The JSON output still includes them + * so scripts that need a full inventory can opt in via --all (or + * just consume JSON). + * + * Source of truth is the broker-side `role` field + * (`'control-plane' | 'session' | 'service'`). Older brokers don't + * emit `role` yet — this code falls back to treating missing role as + * `'session'` so legacy peer rows stay visible. */ all?: boolean; } +/** + * Broker-emitted peer classification, added 2026-05-04. Older brokers + * may omit it — treat missing as 'session' so legacy meshes still + * render their peers (and don't accidentally hide them all). The CLI + * never emits 'control-plane' on its own; that comes from the broker. + */ +export type PeerRole = "control-plane" | "session" | "service"; + interface PeerRecord { pubkey: string; /** Stable member pubkey (independent of session). When sender shares @@ -43,10 +57,17 @@ interface PeerRecord { status?: string; summary?: string; groups: Array<{ name: string; role?: string }>; - /** Top-level convenience alias for `profile.role`. Lifted by the - * CLI so JSON consumers see role at the shape's top level instead - * of nested under profile. Same value either way. */ - role?: string; + /** Broker-emitted classification: 'control-plane' | 'session' | + * 'service'. Source of truth for the --all visibility filter and the + * default-hide rule. Older brokers omit this; the CLI fills missing + * values with 'session' so legacy peer rows stay visible. + * + * Note: this replaces the prior CLI-side lift of `profile.role` to + * the top-level `role` field — `profile.role` is user-supplied + * metadata (e.g. "lead", "reviewer"), distinct from the broker's + * presence-class taxonomy. The user-facing string still lives at + * `profile.role` and is rendered inline as `role:`. */ + role?: PeerRole; peerType?: string; channel?: string; model?: string; @@ -139,12 +160,9 @@ async function listPeersForMesh(slug: string): Promise { * surfaced a sender's siblings as separate rows because they're separate * presence rows; the cli just hadn't been making that visible. * - * Also lifts `profile.role` to a top-level `role` field. The broker has - * always returned role nested under `profile.role`, but downstream JSON - * consumers (LLMs in launched sessions, jq pipelines, dashboards) kept - * missing it because nothing pointed at the nesting. A dedicated - * top-level alias makes the intent unmissable without breaking the - * `profile` object's shape for callers that already drill into it. + * Also normalizes the broker's `role` classification: missing values + * (older brokers) default to 'session' so legacy peer rows stay + * visible under the default `--all=false` filter. */ function annotateSelf( peer: PeerRecord, @@ -161,8 +179,8 @@ function annotateSelf( selfSessionPubkey && peer.pubkey === selfSessionPubkey ); - const role = peer.profile?.role?.trim() || undefined; - return { ...peer, ...(role ? { role } : {}), isSelf, isThisSession }; + const role: PeerRole = peer.role ?? "session"; + return { ...peer, role, isSelf, isThisSession }; } export async function runPeers(flags: PeersFlags): Promise { @@ -208,13 +226,18 @@ export async function runPeers(flags: PeersFlags): Promise { continue; } - // Hide claudemesh-daemon rows by default — they're infrastructure - // (the daemon's own member-keyed presence), not interactive peers, - // and they confused users into thinking the daemon counted as a + // Hide control-plane rows by default — they're infrastructure + // (daemon-WS member-keyed presence), not interactive peers, and + // they confused users into thinking the daemon counted as a // separate peer. --all opts back in for debugging. + // + // Source of truth: broker-emitted `role` field (added 2026-05-04). + // annotateSelf() already filled in 'session' for older brokers + // that don't emit role yet, so this filter is backwards-compatible + // by construction — legacy rows show up. const visible = flags.all ? peers - : peers.filter((p) => p.channel !== "claudemesh-daemon"); + : peers.filter((p) => p.role !== "control-plane"); // Sort: this-session first, then your-other-sessions, then real // peers. Within each group, idle/working ahead of dnd. Inside the @@ -226,9 +249,9 @@ export async function runPeers(flags: PeersFlags): Promise { return score(a) - score(b); }); - const hiddenDaemons = peers.length - visible.length; - const header = hiddenDaemons > 0 - ? `peers on ${slug} (${sorted.length}, ${hiddenDaemons} daemon hidden — use --all)` + const hiddenControlPlane = peers.length - visible.length; + const header = hiddenControlPlane > 0 + ? `peers on ${slug} (${sorted.length}, ${hiddenControlPlane} control-plane hidden — use --all)` : `peers on ${slug} (${sorted.length})`; render.section(header); diff --git a/apps/cli/src/daemon/broker.ts b/apps/cli/src/daemon/broker.ts index c6b883d..286dcdd 100644 --- a/apps/cli/src/daemon/broker.ts +++ b/apps/cli/src/daemon/broker.ts @@ -7,13 +7,19 @@ // - Wire envelope adds `client_message_id` (broker may ignore in legacy // mode; Sprint 7 promotes it to authoritative dedupe). // - Reconnect with exponential backoff, signaled to the drain worker. - -import WebSocket from "ws"; +// +// 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) now +// lives in `ws-lifecycle.ts`. This class supplies the daemon-WS hello +// content and routes incoming RPC replies / pushes; the helper handles +// the rest. The hello no longer carries an ephemeral `sessionPubkey` — +// session-targeted DMs land on the per-session WS (SessionBrokerClient) +// since 1.32.1, so this socket only needs the member identity. import type { JoinedMesh } from "~/services/config/facade.js"; import { signHello } from "~/services/broker/hello-sig.js"; +import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js"; -export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; +export type ConnStatus = WsStatus; export interface BrokerSendArgs { /** Target as the broker expects it: peer name | pubkey | @group | * | topic. */ @@ -49,6 +55,8 @@ export interface PeerSummary { hostname?: string; peerType?: string; channel?: string; + /** Broker-side classification, added 2026-05-04. Missing in older brokers. */ + role?: "control-plane" | "session" | "service"; } interface PendingPeerList { @@ -84,9 +92,7 @@ export interface MemoryRow { rememberedAt: string; } -const HELLO_ACK_TIMEOUT_MS = 5_000; -const SEND_ACK_TIMEOUT_MS = 15_000; -const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; +const SEND_ACK_TIMEOUT_MS = 15_000; export interface DaemonBrokerOptions { displayName?: string; @@ -96,12 +102,9 @@ export interface DaemonBrokerOptions { } export class DaemonBrokerClient { - private ws: WebSocket | null = null; + private lifecycle: WsLifecycle | null = null; private _status: ConnStatus = "closed"; private closed = false; - private reconnectAttempt = 0; - private reconnectTimer: NodeJS.Timeout | null = null; - private helloTimer: NodeJS.Timeout | null = null; private pendingAcks = new Map(); private peerListResolvers = new Map(); private skillListResolvers = new Map void; timer: NodeJS.Timeout }>(); @@ -110,8 +113,6 @@ export class DaemonBrokerClient { private stateListResolvers = new Map void; timer: NodeJS.Timeout }>(); private memoryStoreResolvers = new Map void; timer: NodeJS.Timeout }>(); private memoryRecallResolvers = new Map void; timer: NodeJS.Timeout }>(); - private sessionPubkey: string | null = null; - private sessionSecretKey: string | null = null; private opens: Array<() => void> = []; private reqCounter = 0; @@ -125,198 +126,166 @@ export class DaemonBrokerClient { (this.opts.log ?? defaultLog)(level, msg, { mesh: this.mesh.slug, ...meta }); }; - private setConnStatus(s: ConnStatus) { - if (this._status === s) return; - this._status = s; - this.opts.onStatusChange?.(s); - } - /** Open the WS, run the hello handshake, resolve once the broker accepts. */ async connect(): Promise { if (this.closed) throw new Error("client_closed"); if (this._status === "connecting" || this._status === "open") return; - this.setConnStatus("connecting"); - const ws = new WebSocket(this.mesh.brokerUrl); - this.ws = ws; - - return new Promise((resolve, reject) => { - ws.on("open", async () => { - try { - if (!this.sessionPubkey) { - const { generateKeypair } = await import("~/services/crypto/facade.js"); - const kp = await generateKeypair(); - this.sessionPubkey = kp.publicKey; - this.sessionSecretKey = kp.secretKey; - } - const { timestamp, signature } = await signHello( - this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey, - ); - ws.send(JSON.stringify({ - type: "hello", - meshId: this.mesh.meshId, - memberId: this.mesh.memberId, - pubkey: this.mesh.pubkey, - sessionPubkey: this.sessionPubkey, - displayName: this.opts.displayName, - sessionId: `daemon-${process.pid}`, - pid: process.pid, - cwd: process.cwd(), - hostname: require("node:os").hostname(), - peerType: "ai" as const, - channel: "claudemesh-daemon", - timestamp, - signature, - })); - this.helloTimer = setTimeout(() => { - this.log("warn", "broker_hello_ack_timeout"); - try { ws.close(); } catch { /* ignore */ } - reject(new Error("hello_ack_timeout")); - }, HELLO_ACK_TIMEOUT_MS); - } catch (e) { - reject(e instanceof Error ? e : new Error(String(e))); - } - }); - - ws.on("message", (raw) => { - let msg: Record; - try { msg = JSON.parse(raw.toString()) as Record; } - catch { return; } - - if (msg.type === "hello_ack") { - if (this.helloTimer) clearTimeout(this.helloTimer); - this.helloTimer = null; - this.setConnStatus("open"); - this.reconnectAttempt = 0; - // Flush deferred openers (drain worker, etc.) + this.lifecycle = await connectWsWithBackoff({ + url: this.mesh.brokerUrl, + buildHello: async () => { + const { timestamp, signature } = await signHello( + this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey, + ); + return { + type: "hello", + meshId: this.mesh.meshId, + memberId: this.mesh.memberId, + pubkey: this.mesh.pubkey, + // No `sessionPubkey` — daemon-WS is member-keyed only. The + // per-session presence WS (SessionBrokerClient) carries the + // ephemeral session pubkey. Spec §"Layer 1: Identity → Member identity". + displayName: this.opts.displayName, + sessionId: `daemon-${process.pid}`, + pid: process.pid, + cwd: process.cwd(), + hostname: require("node:os").hostname(), + peerType: "ai" as const, + channel: "claudemesh-daemon", + timestamp, + signature, + }; + }, + isHelloAck: (msg) => msg.type === "hello_ack", + onMessage: (msg) => this.handleMessage(msg), + onStatusChange: (s) => { + this._status = s; + this.opts.onStatusChange?.(s); + if (s === "open") { + // Flush deferred openers (drain worker, etc.). const queued = this.opens.slice(); this.opens.length = 0; - for (const fn of queued) { try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); } } - resolve(); - return; - } - - if (msg.type === "ack") { - // Broker shape: { type: "ack", id, messageId, queued, error? } - const id = String(msg.id ?? ""); - const ack = this.pendingAcks.get(id); - if (ack) { - this.pendingAcks.delete(id); - clearTimeout(ack.timer); - if (typeof msg.error === "string" && msg.error.length > 0) { - ack.resolve({ ok: false, error: msg.error, permanent: classifyPermanent(msg.error) }); - } else { - ack.resolve({ ok: true, messageId: String(msg.messageId ?? id) }); - } + for (const fn of queued) { + try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); } } - return; } - - if (msg.type === "peers_list") { - const reqId = String(msg._reqId ?? ""); - const pending = this.peerListResolvers.get(reqId); - if (pending) { - this.peerListResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve(Array.isArray(msg.peers) ? (msg.peers as PeerSummary[]) : []); - } - return; - } - - if (msg.type === "skill_list") { - const reqId = String(msg._reqId ?? ""); - const pending = this.skillListResolvers.get(reqId); - if (pending) { - this.skillListResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve(Array.isArray(msg.skills) ? (msg.skills as SkillSummary[]) : []); - } - return; - } - - if (msg.type === "skill_data") { - const reqId = String(msg._reqId ?? ""); - const pending = this.skillDataResolvers.get(reqId); - if (pending) { - this.skillDataResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve((msg.skill as SkillFull) ?? null); - } - return; - } - - if (msg.type === "state_value" || msg.type === "state_data") { - const reqId = String(msg._reqId ?? ""); - const pending = this.stateGetResolvers.get(reqId); - if (pending) { - this.stateGetResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve((msg.state ?? msg.row ?? null) as StateRow | null); - } - return; - } - - if (msg.type === "state_list") { - const reqId = String(msg._reqId ?? ""); - const pending = this.stateListResolvers.get(reqId); - if (pending) { - this.stateListResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve(Array.isArray(msg.entries) ? (msg.entries as StateRow[]) : []); - } - return; - } - - if (msg.type === "memory_stored") { - const reqId = String(msg._reqId ?? ""); - const pending = this.memoryStoreResolvers.get(reqId); - if (pending) { - this.memoryStoreResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve(typeof msg.memoryId === "string" ? msg.memoryId : null); - } - return; - } - - if (msg.type === "memory_recall_result") { - const reqId = String(msg._reqId ?? ""); - const pending = this.memoryRecallResolvers.get(reqId); - if (pending) { - this.memoryRecallResolvers.delete(reqId); - clearTimeout(pending.timer); - pending.resolve(Array.isArray(msg.matches) ? (msg.matches as MemoryRow[]) : []); - } - return; - } - - if (msg.type === "push" || msg.type === "inbound") { - this.opts.onPush?.(msg); - return; - } - }); - - ws.on("close", (code, reason) => { - if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } - this.failPendingAcks(`broker_disconnected_${code}`); - if (this.closed) { this.setConnStatus("closed"); return; } - this.setConnStatus("reconnecting"); - const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000; - this.reconnectAttempt++; - this.log("info", "broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") }); - this.reconnectTimer = setTimeout(() => this.connect().catch((err) => this.log("warn", "broker_reconnect_failed", { err: String(err) })), wait); - // First connection failure also rejects the original connect() promise. - if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`)); - }); - - ws.on("error", (err) => this.log("warn", "broker_ws_error", { err: err.message })); + }, + onBeforeReconnect: (code) => this.failPendingAcks(`broker_disconnected_${code}`), + log: (level, msg, meta) => this.log(level, `broker_${msg}`, meta), }); } + private handleMessage(msg: Record): void { + if (msg.type === "ack") { + // Broker shape: { type: "ack", id, messageId, queued, error? } + const id = String(msg.id ?? ""); + const ack = this.pendingAcks.get(id); + if (ack) { + this.pendingAcks.delete(id); + clearTimeout(ack.timer); + if (typeof msg.error === "string" && msg.error.length > 0) { + ack.resolve({ ok: false, error: msg.error, permanent: classifyPermanent(msg.error) }); + } else { + ack.resolve({ ok: true, messageId: String(msg.messageId ?? id) }); + } + } + return; + } + + if (msg.type === "peers_list") { + const reqId = String(msg._reqId ?? ""); + const pending = this.peerListResolvers.get(reqId); + if (pending) { + this.peerListResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(Array.isArray(msg.peers) ? (msg.peers as PeerSummary[]) : []); + } + return; + } + + if (msg.type === "skill_list") { + const reqId = String(msg._reqId ?? ""); + const pending = this.skillListResolvers.get(reqId); + if (pending) { + this.skillListResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(Array.isArray(msg.skills) ? (msg.skills as SkillSummary[]) : []); + } + return; + } + + if (msg.type === "skill_data") { + const reqId = String(msg._reqId ?? ""); + const pending = this.skillDataResolvers.get(reqId); + if (pending) { + this.skillDataResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve((msg.skill as SkillFull) ?? null); + } + return; + } + + if (msg.type === "state_value" || msg.type === "state_data") { + const reqId = String(msg._reqId ?? ""); + const pending = this.stateGetResolvers.get(reqId); + if (pending) { + this.stateGetResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve((msg.state ?? msg.row ?? null) as StateRow | null); + } + return; + } + + if (msg.type === "state_list") { + const reqId = String(msg._reqId ?? ""); + const pending = this.stateListResolvers.get(reqId); + if (pending) { + this.stateListResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(Array.isArray(msg.entries) ? (msg.entries as StateRow[]) : []); + } + return; + } + + if (msg.type === "memory_stored") { + const reqId = String(msg._reqId ?? ""); + const pending = this.memoryStoreResolvers.get(reqId); + if (pending) { + this.memoryStoreResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(typeof msg.memoryId === "string" ? msg.memoryId : null); + } + return; + } + + if (msg.type === "memory_recall_result") { + const reqId = String(msg._reqId ?? ""); + const pending = this.memoryRecallResolvers.get(reqId); + if (pending) { + this.memoryRecallResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(Array.isArray(msg.matches) ? (msg.matches as MemoryRow[]) : []); + } + return; + } + + if (msg.type === "push" || msg.type === "inbound") { + this.opts.onPush?.(msg); + return; + } + } + + /** True when underlying socket is OPEN-ready for direct sends. */ + private isOpen(): boolean { + const sock = this.lifecycle?.ws; + return !!sock && sock.readyState === sock.OPEN; + } + /** Send one outbox row. Resolves on broker ack/timeout. */ send(req: BrokerSendArgs): Promise { return new Promise((resolve) => { const dispatch = () => { - if (!this.ws || this.ws.readyState !== this.ws.OPEN) { + if (!this.isOpen()) { resolve({ ok: false, error: "broker_not_open", permanent: false }); return; } @@ -328,7 +297,7 @@ export class DaemonBrokerClient { }, SEND_ACK_TIMEOUT_MS); this.pendingAcks.set(id, { resolve, timer }); try { - this.ws.send(JSON.stringify({ + this.lifecycle!.send({ type: "send", id, // legacy correlation id client_message_id: id, // forward-compat per spec §4.2 @@ -337,7 +306,7 @@ export class DaemonBrokerClient { priority: req.priority, nonce: req.nonce, ciphertext: req.ciphertext, - })); + }); } catch (e) { this.pendingAcks.delete(id); clearTimeout(timer); @@ -352,153 +321,149 @@ export class DaemonBrokerClient { /** Ask the broker for the current peer list. */ async listPeers(timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return []; + if (this._status !== "open" || !this.lifecycle) return []; return new Promise((resolve) => { const reqId = `pl-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.peerListResolvers.delete(reqId)) resolve([]); }, timeoutMs); this.peerListResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId })); } + try { this.lifecycle!.send({ type: "list_peers", _reqId: reqId }); } catch { this.peerListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } }); } /** List mesh-published skills. Empty array on disconnect / timeout. */ async listSkills(query?: string, timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return []; + if (this._status !== "open" || !this.lifecycle) return []; return new Promise((resolve) => { const reqId = `sl-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.skillListResolvers.delete(reqId)) resolve([]); }, timeoutMs); this.skillListResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); } + try { this.lifecycle!.send({ type: "list_skills", query, _reqId: reqId }); } catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } }); } /** Fetch one skill's full body. Null on not-found / disconnect / timeout. */ async getSkill(name: string, timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return null; + if (this._status !== "open" || !this.lifecycle) return null; return new Promise((resolve) => { const reqId = `sg-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.skillDataResolvers.delete(reqId)) resolve(null); }, timeoutMs); this.skillDataResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); } + try { this.lifecycle!.send({ type: "get_skill", name, _reqId: reqId }); } catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); } }); } /** Read a single shared state row. Null on disconnect / timeout / not-found. */ async getState(key: string, timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return null; + if (this._status !== "open" || !this.lifecycle) return null; return new Promise((resolve) => { const reqId = `sg-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.stateGetResolvers.delete(reqId)) resolve(null); }, timeoutMs); this.stateGetResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId })); } + try { this.lifecycle!.send({ type: "get_state", key, _reqId: reqId }); } catch { this.stateGetResolvers.delete(reqId); clearTimeout(timer); resolve(null); } }); } /** List all shared state rows in the mesh. */ async listState(timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return []; + if (this._status !== "open" || !this.lifecycle) return []; return new Promise((resolve) => { const reqId = `sl-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.stateListResolvers.delete(reqId)) resolve([]); }, timeoutMs); this.stateListResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId })); } + try { this.lifecycle!.send({ type: "list_state", _reqId: reqId }); } catch { this.stateListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } }); } /** Set a shared state value. Fire-and-forget. */ setState(key: string, value: unknown): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "set_state", key, value })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "set_state", key, value }); } catch { /* ignore */ } } /** Store a memory in the mesh. Returns the assigned id, or null on timeout. */ async remember(content: string, tags?: string[], timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return null; + if (this._status !== "open" || !this.lifecycle) return null; return new Promise((resolve) => { const reqId = `mr-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.memoryStoreResolvers.delete(reqId)) resolve(null); }, timeoutMs); this.memoryStoreResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId })); } + try { this.lifecycle!.send({ type: "remember", content, tags, _reqId: reqId }); } catch { this.memoryStoreResolvers.delete(reqId); clearTimeout(timer); resolve(null); } }); } /** Search memories by relevance. */ async recall(query: string, timeoutMs = 5_000): Promise { - if (this._status !== "open" || !this.ws) return []; + if (this._status !== "open" || !this.lifecycle) return []; return new Promise((resolve) => { const reqId = `mc-${++this.reqCounter}`; const timer = setTimeout(() => { if (this.memoryRecallResolvers.delete(reqId)) resolve([]); }, timeoutMs); this.memoryRecallResolvers.set(reqId, { resolve, timer }); - try { this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId })); } + try { this.lifecycle!.send({ type: "recall", query, _reqId: reqId }); } catch { this.memoryRecallResolvers.delete(reqId); clearTimeout(timer); resolve([]); } }); } /** Forget a memory by id. Fire-and-forget. */ forget(memoryId: string): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "forget", memoryId })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "forget", memoryId }); } catch { /* ignore */ } } /** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */ setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "set_profile", ...profile })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "set_profile", ...profile }); } catch { /* ignore */ } } setSummary(summary: string): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "set_summary", summary })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "set_summary", summary }); } catch { /* ignore */ } } setStatus(status: "idle" | "working" | "dnd"): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "set_status", status })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "set_status", status }); } catch { /* ignore */ } } setVisible(visible: boolean): void { - if (this._status !== "open" || !this.ws) return; - try { this.ws.send(JSON.stringify({ type: "set_visible", visible })); } + if (this._status !== "open" || !this.lifecycle) return; + try { this.lifecycle.send({ type: "set_visible", visible }); } catch { /* ignore */ } } async close(): Promise { this.closed = true; - if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } - if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } this.failPendingAcks("daemon_shutdown"); - try { this.ws?.close(); } catch { /* ignore */ } - this.setConnStatus("closed"); - } - - getSessionKeys(): { sessionPubkey: string; sessionSecretKey: string } | null { - if (!this.sessionPubkey || !this.sessionSecretKey) return null; - return { sessionPubkey: this.sessionPubkey, sessionSecretKey: this.sessionSecretKey }; + if (this.lifecycle) { + try { await this.lifecycle.close(); } catch { /* ignore */ } + this.lifecycle = null; + } + this._status = "closed"; } private failPendingAcks(reason: string) { diff --git a/apps/cli/src/daemon/ipc/server.ts b/apps/cli/src/daemon/ipc/server.ts index 9165947..e0fb50b 100644 --- a/apps/cli/src/daemon/ipc/server.ts +++ b/apps/cli/src/daemon/ipc/server.ts @@ -870,11 +870,14 @@ async function resolveAndEncrypt( return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" }; } - // 64-char hex pubkey → DM directly. + // 64-char hex pubkey → DM directly. Encrypt with the daemon's member + // secret: recipient decrypts using THEIR session pubkey's matching + // secret on their session-WS, so the sender side just needs any + // private key whose public counterpart is known to the recipient as + // "the sender". Member key is the stable choice and is what the + // recipient already trusts via mesh membership. if (/^[0-9a-f]{64}$/i.test(to)) { - const sessionKeys = broker.getSessionKeys(); - const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey; - const env = await encryptDirect(req.message, to, senderSecret); + const env = await encryptDirect(req.message, to, meshSecretKey); return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; } @@ -890,9 +893,7 @@ async function resolveAndEncrypt( if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`); if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`); const recipient = matches[0]!.pubkey; - const sessionKeys = broker.getSessionKeys(); - const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey; - const env = await encryptDirect(req.message, recipient, senderSecret); + const env = await encryptDirect(req.message, recipient, meshSecretKey); return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; } @@ -900,9 +901,7 @@ async function resolveAndEncrypt( const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase()); if (!match) throw new Error(`peer "${to}" not found`); const recipient = match.pubkey; - const sessionKeys = broker.getSessionKeys(); - const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey; - const env = await encryptDirect(req.message, recipient, senderSecret); + const env = await encryptDirect(req.message, recipient, meshSecretKey); return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; } diff --git a/apps/cli/src/daemon/run.ts b/apps/cli/src/daemon/run.ts index 150bab8..4ed70ad 100644 --- a/apps/cli/src/daemon/run.ts +++ b/apps/cli/src/daemon/run.ts @@ -136,13 +136,16 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { bus.publish("broker_status", { mesh: mesh.slug, status: s }); }, onPush: (m) => { - const sessionKeys = broker.getSessionKeys(); + // Daemon-WS is member-keyed, not session-keyed. Session-targeted + // DMs land on the per-session WS (SessionBrokerClient) since + // 1.32.1 and decrypt with the session secret there. Anything that + // arrives here can only be member-keyed (broadcasts, member DMs, + // system events) — pass member secret only. void handleBrokerPush(m, { db: inboxDb, bus, meshSlug: mesh.slug, recipientSecretKeyHex: mesh.secretKey, - sessionSecretKeyHex: sessionKeys?.sessionSecretKey, }); }, }); diff --git a/apps/cli/src/daemon/session-broker.ts b/apps/cli/src/daemon/session-broker.ts index 0cac77c..217849a 100644 --- a/apps/cli/src/daemon/session-broker.ts +++ b/apps/cli/src/daemon/session-broker.ts @@ -26,15 +26,19 @@ * expected to be deployed first. * * Spec: .artifacts/specs/2026-05-04-per-session-presence.md. + * + * 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) lives + * in `ws-lifecycle.ts`. This class supplies session_hello content and + * routes the inbound onPush; the helper handles the rest. */ import { hostname as osHostname } from "node:os"; -import WebSocket from "ws"; import type { JoinedMesh } from "~/services/config/facade.js"; import { signSessionHello } from "~/services/broker/session-hello-sig.js"; +import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js"; -export type SessionBrokerStatus = "connecting" | "open" | "closed" | "reconnecting"; +export type SessionBrokerStatus = WsStatus; export interface ParentAttestation { sessionPubkey: string; @@ -75,16 +79,13 @@ export interface SessionBrokerOptions { log?: (level: "info" | "warn" | "error", msg: string, meta?: Record) => void; } -const HELLO_ACK_TIMEOUT_MS = 5_000; -const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; - export class SessionBrokerClient { - private ws: WebSocket | null = null; + private lifecycle: WsLifecycle | null = null; private _status: SessionBrokerStatus = "closed"; private closed = false; - private reconnectAttempt = 0; - private reconnectTimer: NodeJS.Timeout | null = null; - private helloTimer: NodeJS.Timeout | null = null; + /** Set when the broker rejects session_hello with `unknown_message_type` — + * older brokers without the 1.30.0 surface. We stop retrying. */ + private brokerUnsupported = false; constructor(private opts: SessionBrokerOptions) {} @@ -100,80 +101,52 @@ export class SessionBrokerClient { }); }; - private setStatus(s: SessionBrokerStatus) { - if (this._status === s) return; - this._status = s; - this.opts.onStatusChange?.(s); - } - /** Open the WS, run session_hello, resolve once the broker accepts. */ async connect(): Promise { if (this.closed) throw new Error("client_closed"); if (this._status === "connecting" || this._status === "open") return; - this.setStatus("connecting"); - - const ws = new WebSocket(this.opts.mesh.brokerUrl); - this.ws = ws; - - return new Promise((resolve, reject) => { - ws.on("open", async () => { - try { - const { timestamp, signature } = await signSessionHello({ - meshId: this.opts.mesh.meshId, - parentMemberPubkey: this.opts.mesh.pubkey, - sessionPubkey: this.opts.sessionPubkey, - sessionSecretKey: this.opts.sessionSecretKey, - }); - ws.send(JSON.stringify({ - type: "session_hello", - meshId: this.opts.mesh.meshId, - parentMemberId: this.opts.mesh.memberId, - parentMemberPubkey: this.opts.mesh.pubkey, - sessionPubkey: this.opts.sessionPubkey, - parentAttestation: this.opts.parentAttestation, - displayName: this.opts.displayName, - sessionId: this.opts.sessionId, - pid: this.opts.pid, - cwd: this.opts.cwd ?? process.cwd(), - hostname: osHostname(), - peerType: "ai" as const, - channel: "claudemesh-session", - ...(this.opts.groups && this.opts.groups.length > 0 ? { groups: this.opts.groups } : {}), - ...(this.opts.role ? { role: this.opts.role } : {}), - timestamp, - signature, - })); - this.helloTimer = setTimeout(() => { - this.log("warn", "session_hello_ack_timeout"); - try { ws.close(); } catch { /* ignore */ } - reject(new Error("session_hello_ack_timeout")); - }, HELLO_ACK_TIMEOUT_MS); - } catch (e) { - reject(e instanceof Error ? e : new Error(String(e))); - } - }); - - ws.on("message", (raw) => { - let msg: Record; - try { msg = JSON.parse(raw.toString()) as Record; } - catch { return; } - - if (msg.type === "hello_ack") { - if (this.helloTimer) clearTimeout(this.helloTimer); - this.helloTimer = null; - this.setStatus("open"); - this.reconnectAttempt = 0; - resolve(); - return; - } + this.lifecycle = await connectWsWithBackoff({ + url: this.opts.mesh.brokerUrl, + buildHello: async () => { + const { timestamp, signature } = await signSessionHello({ + meshId: this.opts.mesh.meshId, + parentMemberPubkey: this.opts.mesh.pubkey, + sessionPubkey: this.opts.sessionPubkey, + sessionSecretKey: this.opts.sessionSecretKey, + }); + return { + type: "session_hello", + meshId: this.opts.mesh.meshId, + parentMemberId: this.opts.mesh.memberId, + parentMemberPubkey: this.opts.mesh.pubkey, + sessionPubkey: this.opts.sessionPubkey, + parentAttestation: this.opts.parentAttestation, + displayName: this.opts.displayName, + sessionId: this.opts.sessionId, + pid: this.opts.pid, + cwd: this.opts.cwd ?? process.cwd(), + hostname: osHostname(), + peerType: "ai" as const, + channel: "claudemesh-session", + ...(this.opts.groups && this.opts.groups.length > 0 ? { groups: this.opts.groups } : {}), + ...(this.opts.role ? { role: this.opts.role } : {}), + timestamp, + signature, + }; + }, + isHelloAck: (msg) => msg.type === "hello_ack", + onMessage: (msg) => { if (msg.type === "error") { // Older brokers respond with `unknown_message_type` to session_hello; // surface that so the daemon can decide to skip per-session presence - // rather than churn through reconnects. + // rather than churn through reconnects. Setting `closed` halts the + // helper's reconnect loop on the next close. this.log("warn", "broker_error", { code: msg.code, message: msg.message }); if (msg.code === "unknown_message_type") { + this.brokerUnsupported = true; this.closed = true; + void this.lifecycle?.close(); } return; } @@ -185,33 +158,27 @@ export class SessionBrokerClient { this.opts.onPush?.(msg); return; } - }); - - ws.on("close", (code, reason) => { - if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } - if (this.closed) { this.setStatus("closed"); return; } - this.setStatus("reconnecting"); - const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000; - this.reconnectAttempt++; - this.log("info", "session_broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") }); - this.reconnectTimer = setTimeout( - () => this.connect().catch((err) => this.log("warn", "session_broker_reconnect_failed", { err: String(err) })), - wait, - ); - if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`)); - }); - - ws.on("error", (err) => this.log("warn", "session_broker_ws_error", { err: err.message })); + }, + onStatusChange: (s) => { + this._status = s; + this.opts.onStatusChange?.(s); + }, + log: (level, msg, meta) => this.log(level, `session_broker_${msg}`, meta), }); } async close(): Promise { this.closed = true; - if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } - if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } - try { this.ws?.close(); } catch { /* ignore */ } - this.setStatus("closed"); + if (this.lifecycle) { + try { await this.lifecycle.close(); } catch { /* ignore */ } + this.lifecycle = null; + } + this._status = "closed"; } + + /** True when the broker rejected our session_hello as unknown — caller + * may want to skip per-session presence entirely on this mesh. */ + get isBrokerUnsupported(): boolean { return this.brokerUnsupported; } } function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record) { diff --git a/apps/cli/src/daemon/ws-lifecycle.ts b/apps/cli/src/daemon/ws-lifecycle.ts new file mode 100644 index 0000000..7a96e40 --- /dev/null +++ b/apps/cli/src/daemon/ws-lifecycle.ts @@ -0,0 +1,234 @@ +/** + * Shared WS lifecycle helper for the daemon's two broker clients. + * + * Both `DaemonBrokerClient` (member-keyed, one per joined mesh) and + * `SessionBrokerClient` (session-keyed, one per launched session) used + * to inline the same connect/hello/ack-timeout/close-reconnect logic. + * They drifted apart subtly — different ack-timeout names, different + * reconnect log messages, slightly different status flips — and that's + * how 1.32.x bugs shipped (push handler attached to the wrong client, + * etc). + * + * This helper owns ONLY the lifecycle: + * - new WebSocket(url), wire up open/message/close/error + * - on open → call buildHello() and send the result + * - start an ack-timeout timer; if it fires before the hello ack + * arrives, close the socket and reject the connect promise + * - on message, gate on isHelloAck(); when true, flip status to + * "open", clear the ack timer, resolve. All other messages are + * forwarded to onMessage() + * - on close, schedule a backoff reconnect (unless explicitly closed) + * + * Each client keeps its own concerns: DaemonBrokerClient still owns + * pendingAcks / peerListResolvers / etc; SessionBrokerClient still owns + * its onPush callback. The helper just hands them an open WS and a + * stable status field, and reconnects under their feet on disconnect. + * + * Composition over inheritance — callers receive a `WsLifecycle` handle + * with `send` / `close` / `status`, NOT a subclass. + */ + +import WebSocket from "ws"; + +export type WsStatus = "connecting" | "open" | "closed" | "reconnecting"; + +export type WsLogLevel = "info" | "warn" | "error"; +export type WsLog = (level: WsLogLevel, msg: string, meta?: Record) => void; + +export interface WsLifecycleOptions { + /** Broker URL (e.g. wss://ic.claudemesh.com/ws). */ + url: string; + /** + * Build the hello frame to send right after the WS opens. Async because + * signing the hello may need libsodium initialization. Whatever this + * returns is JSON.stringified and sent verbatim — the helper does NOT + * inspect or modify it. + */ + buildHello: () => Promise; + /** + * Returns true iff `msg` is the hello ack the helper should treat as + * "broker accepted us; flip status to open". Both daemon-WS and + * session-WS use `{ type: "hello_ack" }` today, but keeping this a + * predicate lets either client narrow further (e.g. on a `code` field) + * without leaking client-specific shape into the helper. + */ + isHelloAck: (msg: Record) => boolean; + /** + * Called for every parsed message that is NOT the hello ack. The + * helper does NOT decide which messages are pushes vs RPCs vs errors; + * that's the caller's concern. + */ + onMessage: (msg: Record) => void; + onStatusChange?: (s: WsStatus) => void; + /** + * How long to wait for the broker's hello ack before giving up and + * forcing a close. Defaults 5s — same as both pre-refactor clients. + */ + helloAckTimeoutMs?: number; + /** + * Reconnect backoff schedule. Defaults [1s, 2s, 4s, 8s, 16s, 30s] — + * matches both pre-refactor clients exactly. + */ + backoffCapsMs?: readonly number[]; + log?: WsLog; + /** + * Hook for the close path BEFORE the helper schedules a reconnect. + * Used by DaemonBrokerClient to fail its in-flight pendingAcks map + * with a "broker_disconnected_" reason. The helper passes the + * raw close code so the caller can shape its rejection text. + * + * Returns nothing — close handling continues regardless. + */ + onBeforeReconnect?: (code: number, reason: string) => void; +} + +export interface WsLifecycle { + /** Current connection status. Updated synchronously before onStatusChange fires. */ + readonly status: WsStatus; + /** Underlying socket. Exposed for callers that need OPEN-state checks + * before sending (mirrors the pre-refactor `this.ws.readyState` checks). */ + readonly ws: WebSocket | null; + /** Send a JSON payload over the open WS. Throws if not open — callers + * that need queue-while-disconnected semantics should layer that + * themselves (DaemonBrokerClient does, via its `opens` deferred-fn array). */ + send(payload: unknown): void; + /** Close the WS and stop reconnecting. Idempotent. */ + close(): Promise; +} + +const DEFAULT_HELLO_ACK_TIMEOUT_MS = 5_000; +const DEFAULT_BACKOFF_CAPS_MS: readonly number[] = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; + +const defaultLog: WsLog = (level, msg, meta) => { + const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() }); + if (level === "info") process.stdout.write(line + "\n"); + else process.stderr.write(line + "\n"); +}; + +/** + * Connect a WebSocket with hello-handshake, ack-timeout, and reconnect + * with exponential backoff. Resolves once the broker accepts the hello; + * rejects if the first connect closes before the ack lands. + * + * Subsequent automatic reconnects are silent — they fire on the close + * handler's backoff timer and surface only via onStatusChange (and any + * caller-installed log). + */ +export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise { + const helloAckTimeoutMs = opts.helloAckTimeoutMs ?? DEFAULT_HELLO_ACK_TIMEOUT_MS; + const backoffCapsMs = opts.backoffCapsMs ?? DEFAULT_BACKOFF_CAPS_MS; + const log: WsLog = opts.log ?? defaultLog; + + let ws: WebSocket | null = null; + let status: WsStatus = "closed"; + let closed = false; + let reconnectAttempt = 0; + let reconnectTimer: NodeJS.Timeout | null = null; + let helloTimer: NodeJS.Timeout | null = null; + + const setStatus = (s: WsStatus) => { + if (status === s) return; + status = s; + opts.onStatusChange?.(s); + }; + + /** + * Open one WS attempt. Returns a promise that resolves on hello ack + * or rejects if the socket closes before we get one. Used by both the + * initial connect and the close-handler backoff timer (which awaits + * but ignores the rejection — by then the close handler has already + * scheduled its own reconnect). + */ + const openOnce = (): Promise => { + if (closed) return Promise.reject(new Error("client_closed")); + setStatus("connecting"); + + const sock = new WebSocket(opts.url); + ws = sock; + + return new Promise((resolve, reject) => { + sock.on("open", () => { + // Build and send the hello inside a microtask so any sync + // throws from buildHello() reject this connect attempt cleanly. + (async () => { + try { + const hello = await opts.buildHello(); + sock.send(JSON.stringify(hello)); + helloTimer = setTimeout(() => { + log("warn", "hello_ack_timeout", { url: opts.url }); + try { sock.close(); } catch { /* ignore */ } + reject(new Error("hello_ack_timeout")); + }, helloAckTimeoutMs); + } catch (e) { + reject(e instanceof Error ? e : new Error(String(e))); + } + })(); + }); + + sock.on("message", (raw) => { + let msg: Record; + try { msg = JSON.parse(raw.toString()) as Record; } + catch { return; } + + if (opts.isHelloAck(msg)) { + if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; } + setStatus("open"); + reconnectAttempt = 0; + resolve(); + // Don't forward hello_ack to onMessage — both pre-refactor + // clients consumed it inline and never delegated. + return; + } + + opts.onMessage(msg); + }); + + sock.on("close", (code, reason) => { + if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; } + const reasonStr = reason.toString("utf8"); + opts.onBeforeReconnect?.(code, reasonStr); + + if (closed) { + setStatus("closed"); + return; + } + setStatus("reconnecting"); + const wait = backoffCapsMs[Math.min(reconnectAttempt, backoffCapsMs.length - 1)] ?? 30_000; + reconnectAttempt++; + log("info", "ws_reconnect_scheduled", { url: opts.url, wait_ms: wait, code, reason: reasonStr }); + reconnectTimer = setTimeout( + () => openOnce().catch((err) => log("warn", "ws_reconnect_failed", { url: opts.url, err: String(err) })), + wait, + ); + // First attempt failure (still in connecting) also rejects the + // initial connect promise so callers can surface it. + if (status === "connecting" || status === "reconnecting") { + reject(new Error(`closed_before_hello_${code}`)); + } + }); + + sock.on("error", (err) => log("warn", "ws_error", { url: opts.url, err: err.message })); + }); + }; + + return openOnce().then(() => { + const handle: WsLifecycle = { + get status() { return status; }, + get ws() { return ws; }, + send(payload: unknown) { + if (!ws || ws.readyState !== ws.OPEN) { + throw new Error("ws_not_open"); + } + ws.send(JSON.stringify(payload)); + }, + async close() { + closed = true; + if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; } + if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; } + try { ws?.close(); } catch { /* ignore */ } + setStatus("closed"); + }, + }; + return handle; + }); +}