refactor(cli): m1 lifecycle + role-aware peer list

Foundational cleanups before agentic-comms architecture work
(.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md).
All behavior-preserving.

1. Extract `connectWsWithBackoff` into apps/cli/src/daemon/ws-lifecycle.ts.
   Both DaemonBrokerClient and SessionBrokerClient now share one
   lifecycle implementation (connect, hello-handshake, ack-timeout,
   close + backoff reconnect). Each client provides its own buildHello
   / isHelloAck / onMessage hooks and keeps its own RPC bookkeeping
   (pendingAcks, peerListResolvers, onPush). Composition over
   inheritance per Codex's review; no protocol shape changes.

2. Drop daemon-WS ephemeral session pubkey. DaemonBrokerClient no
   longer mints + sends a per-reconnect ephemeral keypair in its
   hello. Session-targeted DMs land on SessionBrokerClient since
   1.32.1, not the member-keyed daemon-WS, so the field was
   vestigial. Send-encrypt path now signs DMs with the stable mesh
   member secret. handleBrokerPush invocations from daemon-WS only
   pass the member secret — session decryption is the session-WS's
   job.

3. Role-aware peer list. `peer list` now hides peers whose
   broker-emitted `role` is `'control-plane'`. `--all` opts back in.
   JSON output emits `role` at top level. Older brokers that don't
   emit role yet default to 'session', so legacy peer rows stay
   visible without the broker-side change shipped first. Replaces
   the prior `peerType === 'claudemesh-daemon'` channel-name hack.

Typecheck + tests + build all green.
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 18:08:32 +01:00
parent a25102a79f
commit dab80f475e
7 changed files with 575 additions and 354 deletions

View File

@@ -1,5 +1,35 @@
# Changelog # Changelog
## Unreleased — Milestone 1 lifecycle cleanups
Foundational refactor before the agentic-comms architecture work
(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). Three
changes, all behavior-preserving:
- **`connectWsWithBackoff` helper** (`apps/cli/src/daemon/ws-lifecycle.ts`).
Both `DaemonBrokerClient` and `SessionBrokerClient` now share one
lifecycle implementation — connect, hello-handshake, ack-timeout,
close + backoff reconnect. Each client supplies `buildHello` /
`isHelloAck` / `onMessage` and keeps its own RPC bookkeeping
(pendingAcks, peerListResolvers, onPush, etc). Composition over
inheritance per Codex's review; no protocol shape changes.
- **Drop daemon-WS ephemeral session pubkey.** `DaemonBrokerClient` no
longer mints + sends a per-reconnect ephemeral keypair in its hello.
Session-targeted DMs land on `SessionBrokerClient` (since 1.32.1),
not the member-keyed daemon-WS, so the field was vestigial. The
daemon's send-encrypt path now signs DMs with the stable mesh member
secret. Inbound on daemon-WS only attempts member-key decryption —
session decryption is the session-WS's job.
- **Role-aware peer list.** `peer list` now hides peers whose
broker-emitted `role` is `'control-plane'` (the daemon's own
member-keyed presence). `--all` opts back in. JSON output emits
`role` at the top level. Older brokers that don't emit `role` yet
default to `'session'`, so legacy peer rows stay visible without
the broker-side change shipped first. Replaces the prior
`peerType === 'claudemesh-daemon'` channel-name hack.
## 1.32.1 (2026-05-04) — DMs to session pubkeys actually deliver now ## 1.32.1 (2026-05-04) — DMs to session pubkeys actually deliver now
Critical fix. Sessions launched via `claudemesh launch` (1.30.0+) hold a Critical fix. Sessions launched via `claudemesh launch` (1.30.0+) hold a

View File

@@ -21,14 +21,28 @@ export interface PeersFlags {
mesh?: string; mesh?: string;
/** `true`/`undefined` = full record; comma-separated string = field projection. */ /** `true`/`undefined` = full record; comma-separated string = field projection. */
json?: boolean | string; json?: boolean | string;
/** When false (default), hide claudemesh-daemon presence rows from the /** When false (default), hide control-plane presence rows from the
* human renderer — they're infrastructure, not interactive peers, and * human renderer — they're infrastructure (daemon-WS member-keyed
* confused users into thinking the daemon counted as a "peer". The * presence), not interactive peers, and confused users into thinking
* JSON output still includes them so scripts that need a full inventory * the daemon counted as a "peer". The JSON output still includes them
* can opt in via --all (or just consume JSON). */ * so scripts that need a full inventory can opt in via --all (or
* just consume JSON).
*
* Source of truth is the broker-side `role` field
* (`'control-plane' | 'session' | 'service'`). Older brokers don't
* emit `role` yet — this code falls back to treating missing role as
* `'session'` so legacy peer rows stay visible. */
all?: boolean; all?: boolean;
} }
/**
* Broker-emitted peer classification, added 2026-05-04. Older brokers
* may omit it — treat missing as 'session' so legacy meshes still
* render their peers (and don't accidentally hide them all). The CLI
* never emits 'control-plane' on its own; that comes from the broker.
*/
export type PeerRole = "control-plane" | "session" | "service";
interface PeerRecord { interface PeerRecord {
pubkey: string; pubkey: string;
/** Stable member pubkey (independent of session). When sender shares /** Stable member pubkey (independent of session). When sender shares
@@ -43,10 +57,17 @@ interface PeerRecord {
status?: string; status?: string;
summary?: string; summary?: string;
groups: Array<{ name: string; role?: string }>; groups: Array<{ name: string; role?: string }>;
/** Top-level convenience alias for `profile.role`. Lifted by the /** Broker-emitted classification: 'control-plane' | 'session' |
* CLI so JSON consumers see role at the shape's top level instead * 'service'. Source of truth for the --all visibility filter and the
* of nested under profile. Same value either way. */ * default-hide rule. Older brokers omit this; the CLI fills missing
role?: string; * values with 'session' so legacy peer rows stay visible.
*
* Note: this replaces the prior CLI-side lift of `profile.role` to
* the top-level `role` field — `profile.role` is user-supplied
* metadata (e.g. "lead", "reviewer"), distinct from the broker's
* presence-class taxonomy. The user-facing string still lives at
* `profile.role` and is rendered inline as `role:<value>`. */
role?: PeerRole;
peerType?: string; peerType?: string;
channel?: string; channel?: string;
model?: string; model?: string;
@@ -139,12 +160,9 @@ async function listPeersForMesh(slug: string): Promise<PeerRecord[]> {
* surfaced a sender's siblings as separate rows because they're separate * surfaced a sender's siblings as separate rows because they're separate
* presence rows; the cli just hadn't been making that visible. * presence rows; the cli just hadn't been making that visible.
* *
* Also lifts `profile.role` to a top-level `role` field. The broker has * Also normalizes the broker's `role` classification: missing values
* always returned role nested under `profile.role`, but downstream JSON * (older brokers) default to 'session' so legacy peer rows stay
* consumers (LLMs in launched sessions, jq pipelines, dashboards) kept * visible under the default `--all=false` filter.
* missing it because nothing pointed at the nesting. A dedicated
* top-level alias makes the intent unmissable without breaking the
* `profile` object's shape for callers that already drill into it.
*/ */
function annotateSelf( function annotateSelf(
peer: PeerRecord, peer: PeerRecord,
@@ -161,8 +179,8 @@ function annotateSelf(
selfSessionPubkey && selfSessionPubkey &&
peer.pubkey === selfSessionPubkey peer.pubkey === selfSessionPubkey
); );
const role = peer.profile?.role?.trim() || undefined; const role: PeerRole = peer.role ?? "session";
return { ...peer, ...(role ? { role } : {}), isSelf, isThisSession }; return { ...peer, role, isSelf, isThisSession };
} }
export async function runPeers(flags: PeersFlags): Promise<void> { export async function runPeers(flags: PeersFlags): Promise<void> {
@@ -208,13 +226,18 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
continue; continue;
} }
// Hide claudemesh-daemon rows by default — they're infrastructure // Hide control-plane rows by default — they're infrastructure
// (the daemon's own member-keyed presence), not interactive peers, // (daemon-WS member-keyed presence), not interactive peers, and
// and they confused users into thinking the daemon counted as a // they confused users into thinking the daemon counted as a
// separate peer. --all opts back in for debugging. // separate peer. --all opts back in for debugging.
//
// Source of truth: broker-emitted `role` field (added 2026-05-04).
// annotateSelf() already filled in 'session' for older brokers
// that don't emit role yet, so this filter is backwards-compatible
// by construction — legacy rows show up.
const visible = flags.all const visible = flags.all
? peers ? peers
: peers.filter((p) => p.channel !== "claudemesh-daemon"); : peers.filter((p) => p.role !== "control-plane");
// Sort: this-session first, then your-other-sessions, then real // Sort: this-session first, then your-other-sessions, then real
// peers. Within each group, idle/working ahead of dnd. Inside the // peers. Within each group, idle/working ahead of dnd. Inside the
@@ -226,9 +249,9 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
return score(a) - score(b); return score(a) - score(b);
}); });
const hiddenDaemons = peers.length - visible.length; const hiddenControlPlane = peers.length - visible.length;
const header = hiddenDaemons > 0 const header = hiddenControlPlane > 0
? `peers on ${slug} (${sorted.length}, ${hiddenDaemons} daemon hidden — use --all)` ? `peers on ${slug} (${sorted.length}, ${hiddenControlPlane} control-plane hidden — use --all)`
: `peers on ${slug} (${sorted.length})`; : `peers on ${slug} (${sorted.length})`;
render.section(header); render.section(header);

View File

@@ -7,13 +7,19 @@
// - Wire envelope adds `client_message_id` (broker may ignore in legacy // - Wire envelope adds `client_message_id` (broker may ignore in legacy
// mode; Sprint 7 promotes it to authoritative dedupe). // mode; Sprint 7 promotes it to authoritative dedupe).
// - Reconnect with exponential backoff, signaled to the drain worker. // - Reconnect with exponential backoff, signaled to the drain worker.
//
import WebSocket from "ws"; // 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) now
// lives in `ws-lifecycle.ts`. This class supplies the daemon-WS hello
// content and routes incoming RPC replies / pushes; the helper handles
// the rest. The hello no longer carries an ephemeral `sessionPubkey` —
// session-targeted DMs land on the per-session WS (SessionBrokerClient)
// since 1.32.1, so this socket only needs the member identity.
import type { JoinedMesh } from "~/services/config/facade.js"; import type { JoinedMesh } from "~/services/config/facade.js";
import { signHello } from "~/services/broker/hello-sig.js"; import { signHello } from "~/services/broker/hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; export type ConnStatus = WsStatus;
export interface BrokerSendArgs { export interface BrokerSendArgs {
/** Target as the broker expects it: peer name | pubkey | @group | * | topic. */ /** Target as the broker expects it: peer name | pubkey | @group | * | topic. */
@@ -49,6 +55,8 @@ export interface PeerSummary {
hostname?: string; hostname?: string;
peerType?: string; peerType?: string;
channel?: string; channel?: string;
/** Broker-side classification, added 2026-05-04. Missing in older brokers. */
role?: "control-plane" | "session" | "service";
} }
interface PendingPeerList { interface PendingPeerList {
@@ -84,9 +92,7 @@ export interface MemoryRow {
rememberedAt: string; rememberedAt: string;
} }
const HELLO_ACK_TIMEOUT_MS = 5_000;
const SEND_ACK_TIMEOUT_MS = 15_000; const SEND_ACK_TIMEOUT_MS = 15_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export interface DaemonBrokerOptions { export interface DaemonBrokerOptions {
displayName?: string; displayName?: string;
@@ -96,12 +102,9 @@ export interface DaemonBrokerOptions {
} }
export class DaemonBrokerClient { export class DaemonBrokerClient {
private ws: WebSocket | null = null; private lifecycle: WsLifecycle | null = null;
private _status: ConnStatus = "closed"; private _status: ConnStatus = "closed";
private closed = false; private closed = false;
private reconnectAttempt = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private helloTimer: NodeJS.Timeout | null = null;
private pendingAcks = new Map<string, PendingAck>(); private pendingAcks = new Map<string, PendingAck>();
private peerListResolvers = new Map<string, PendingPeerList>(); private peerListResolvers = new Map<string, PendingPeerList>();
private skillListResolvers = new Map<string, { resolve: (rows: SkillSummary[]) => void; timer: NodeJS.Timeout }>(); private skillListResolvers = new Map<string, { resolve: (rows: SkillSummary[]) => void; timer: NodeJS.Timeout }>();
@@ -110,8 +113,6 @@ export class DaemonBrokerClient {
private stateListResolvers = new Map<string, { resolve: (rows: StateRow[]) => void; timer: NodeJS.Timeout }>(); private stateListResolvers = new Map<string, { resolve: (rows: StateRow[]) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>(); private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers = new Map<string, { resolve: (rows: MemoryRow[]) => void; timer: NodeJS.Timeout }>(); private memoryRecallResolvers = new Map<string, { resolve: (rows: MemoryRow[]) => void; timer: NodeJS.Timeout }>();
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private opens: Array<() => void> = []; private opens: Array<() => void> = [];
private reqCounter = 0; private reqCounter = 0;
@@ -125,39 +126,25 @@ export class DaemonBrokerClient {
(this.opts.log ?? defaultLog)(level, msg, { mesh: this.mesh.slug, ...meta }); (this.opts.log ?? defaultLog)(level, msg, { mesh: this.mesh.slug, ...meta });
}; };
private setConnStatus(s: ConnStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run the hello handshake, resolve once the broker accepts. */ /** Open the WS, run the hello handshake, resolve once the broker accepts. */
async connect(): Promise<void> { async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed"); if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return; if (this._status === "connecting" || this._status === "open") return;
this.setConnStatus("connecting");
const ws = new WebSocket(this.mesh.brokerUrl); this.lifecycle = await connectWsWithBackoff({
this.ws = ws; url: this.mesh.brokerUrl,
buildHello: async () => {
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
if (!this.sessionPubkey) {
const { generateKeypair } = await import("~/services/crypto/facade.js");
const kp = await generateKeypair();
this.sessionPubkey = kp.publicKey;
this.sessionSecretKey = kp.secretKey;
}
const { timestamp, signature } = await signHello( const { timestamp, signature } = await signHello(
this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey, this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey,
); );
ws.send(JSON.stringify({ return {
type: "hello", type: "hello",
meshId: this.mesh.meshId, meshId: this.mesh.meshId,
memberId: this.mesh.memberId, memberId: this.mesh.memberId,
pubkey: this.mesh.pubkey, pubkey: this.mesh.pubkey,
sessionPubkey: this.sessionPubkey, // No `sessionPubkey` — daemon-WS is member-keyed only. The
// per-session presence WS (SessionBrokerClient) carries the
// ephemeral session pubkey. Spec §"Layer 1: Identity → Member identity".
displayName: this.opts.displayName, displayName: this.opts.displayName,
sessionId: `daemon-${process.pid}`, sessionId: `daemon-${process.pid}`,
pid: process.pid, pid: process.pid,
@@ -167,35 +154,28 @@ export class DaemonBrokerClient {
channel: "claudemesh-daemon", channel: "claudemesh-daemon",
timestamp, timestamp,
signature, signature,
})); };
this.helloTimer = setTimeout(() => { },
this.log("warn", "broker_hello_ack_timeout"); isHelloAck: (msg) => msg.type === "hello_ack",
try { ws.close(); } catch { /* ignore */ } onMessage: (msg) => this.handleMessage(msg),
reject(new Error("hello_ack_timeout")); onStatusChange: (s) => {
}, HELLO_ACK_TIMEOUT_MS); this._status = s;
} catch (e) { this.opts.onStatusChange?.(s);
reject(e instanceof Error ? e : new Error(String(e))); if (s === "open") {
} // Flush deferred openers (drain worker, etc.).
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setConnStatus("open");
this.reconnectAttempt = 0;
// Flush deferred openers (drain worker, etc.)
const queued = this.opens.slice(); const queued = this.opens.slice();
this.opens.length = 0; this.opens.length = 0;
for (const fn of queued) { try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); } } for (const fn of queued) {
resolve(); try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); }
return; }
}
},
onBeforeReconnect: (code) => this.failPendingAcks(`broker_disconnected_${code}`),
log: (level, msg, meta) => this.log(level, `broker_${msg}`, meta),
});
} }
private handleMessage(msg: Record<string, unknown>): void {
if (msg.type === "ack") { if (msg.type === "ack") {
// Broker shape: { type: "ack", id, messageId, queued, error? } // Broker shape: { type: "ack", id, messageId, queued, error? }
const id = String(msg.id ?? ""); const id = String(msg.id ?? "");
@@ -293,30 +273,19 @@ export class DaemonBrokerClient {
this.opts.onPush?.(msg); this.opts.onPush?.(msg);
return; return;
} }
}); }
ws.on("close", (code, reason) => { /** True when underlying socket is OPEN-ready for direct sends. */
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } private isOpen(): boolean {
this.failPendingAcks(`broker_disconnected_${code}`); const sock = this.lifecycle?.ws;
if (this.closed) { this.setConnStatus("closed"); return; } return !!sock && sock.readyState === sock.OPEN;
this.setConnStatus("reconnecting");
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(() => this.connect().catch((err) => this.log("warn", "broker_reconnect_failed", { err: String(err) })), wait);
// First connection failure also rejects the original connect() promise.
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
});
ws.on("error", (err) => this.log("warn", "broker_ws_error", { err: err.message }));
});
} }
/** Send one outbox row. Resolves on broker ack/timeout. */ /** Send one outbox row. Resolves on broker ack/timeout. */
send(req: BrokerSendArgs): Promise<BrokerSendResult> { send(req: BrokerSendArgs): Promise<BrokerSendResult> {
return new Promise<BrokerSendResult>((resolve) => { return new Promise<BrokerSendResult>((resolve) => {
const dispatch = () => { const dispatch = () => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) { if (!this.isOpen()) {
resolve({ ok: false, error: "broker_not_open", permanent: false }); resolve({ ok: false, error: "broker_not_open", permanent: false });
return; return;
} }
@@ -328,7 +297,7 @@ export class DaemonBrokerClient {
}, SEND_ACK_TIMEOUT_MS); }, SEND_ACK_TIMEOUT_MS);
this.pendingAcks.set(id, { resolve, timer }); this.pendingAcks.set(id, { resolve, timer });
try { try {
this.ws.send(JSON.stringify({ this.lifecycle!.send({
type: "send", type: "send",
id, // legacy correlation id id, // legacy correlation id
client_message_id: id, // forward-compat per spec §4.2 client_message_id: id, // forward-compat per spec §4.2
@@ -337,7 +306,7 @@ export class DaemonBrokerClient {
priority: req.priority, priority: req.priority,
nonce: req.nonce, nonce: req.nonce,
ciphertext: req.ciphertext, ciphertext: req.ciphertext,
})); });
} catch (e) { } catch (e) {
this.pendingAcks.delete(id); this.pendingAcks.delete(id);
clearTimeout(timer); clearTimeout(timer);
@@ -352,153 +321,149 @@ export class DaemonBrokerClient {
/** Ask the broker for the current peer list. */ /** Ask the broker for the current peer list. */
async listPeers(timeoutMs = 5_000): Promise<PeerSummary[]> { async listPeers(timeoutMs = 5_000): Promise<PeerSummary[]> {
if (this._status !== "open" || !this.ws) return []; if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<PeerSummary[]>((resolve) => { return new Promise<PeerSummary[]>((resolve) => {
const reqId = `pl-${++this.reqCounter}`; const reqId = `pl-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.peerListResolvers.delete(reqId)) resolve([]); if (this.peerListResolvers.delete(reqId)) resolve([]);
}, timeoutMs); }, timeoutMs);
this.peerListResolvers.set(reqId, { resolve, timer }); this.peerListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId })); } try { this.lifecycle!.send({ type: "list_peers", _reqId: reqId }); }
catch { this.peerListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } catch { this.peerListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
}); });
} }
/** List mesh-published skills. Empty array on disconnect / timeout. */ /** List mesh-published skills. Empty array on disconnect / timeout. */
async listSkills(query?: string, timeoutMs = 5_000): Promise<SkillSummary[]> { async listSkills(query?: string, timeoutMs = 5_000): Promise<SkillSummary[]> {
if (this._status !== "open" || !this.ws) return []; if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<SkillSummary[]>((resolve) => { return new Promise<SkillSummary[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`; const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.skillListResolvers.delete(reqId)) resolve([]); if (this.skillListResolvers.delete(reqId)) resolve([]);
}, timeoutMs); }, timeoutMs);
this.skillListResolvers.set(reqId, { resolve, timer }); this.skillListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); } try { this.lifecycle!.send({ type: "list_skills", query, _reqId: reqId }); }
catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
}); });
} }
/** Fetch one skill's full body. Null on not-found / disconnect / timeout. */ /** Fetch one skill's full body. Null on not-found / disconnect / timeout. */
async getSkill(name: string, timeoutMs = 5_000): Promise<SkillFull | null> { async getSkill(name: string, timeoutMs = 5_000): Promise<SkillFull | null> {
if (this._status !== "open" || !this.ws) return null; if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<SkillFull | null>((resolve) => { return new Promise<SkillFull | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`; const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.skillDataResolvers.delete(reqId)) resolve(null); if (this.skillDataResolvers.delete(reqId)) resolve(null);
}, timeoutMs); }, timeoutMs);
this.skillDataResolvers.set(reqId, { resolve, timer }); this.skillDataResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); } try { this.lifecycle!.send({ type: "get_skill", name, _reqId: reqId }); }
catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); } catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
}); });
} }
/** Read a single shared state row. Null on disconnect / timeout / not-found. */ /** Read a single shared state row. Null on disconnect / timeout / not-found. */
async getState(key: string, timeoutMs = 5_000): Promise<StateRow | null> { async getState(key: string, timeoutMs = 5_000): Promise<StateRow | null> {
if (this._status !== "open" || !this.ws) return null; if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<StateRow | null>((resolve) => { return new Promise<StateRow | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`; const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.stateGetResolvers.delete(reqId)) resolve(null); if (this.stateGetResolvers.delete(reqId)) resolve(null);
}, timeoutMs); }, timeoutMs);
this.stateGetResolvers.set(reqId, { resolve, timer }); this.stateGetResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId })); } try { this.lifecycle!.send({ type: "get_state", key, _reqId: reqId }); }
catch { this.stateGetResolvers.delete(reqId); clearTimeout(timer); resolve(null); } catch { this.stateGetResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
}); });
} }
/** List all shared state rows in the mesh. */ /** List all shared state rows in the mesh. */
async listState(timeoutMs = 5_000): Promise<StateRow[]> { async listState(timeoutMs = 5_000): Promise<StateRow[]> {
if (this._status !== "open" || !this.ws) return []; if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<StateRow[]>((resolve) => { return new Promise<StateRow[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`; const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.stateListResolvers.delete(reqId)) resolve([]); if (this.stateListResolvers.delete(reqId)) resolve([]);
}, timeoutMs); }, timeoutMs);
this.stateListResolvers.set(reqId, { resolve, timer }); this.stateListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId })); } try { this.lifecycle!.send({ type: "list_state", _reqId: reqId }); }
catch { this.stateListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } catch { this.stateListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
}); });
} }
/** Set a shared state value. Fire-and-forget. */ /** Set a shared state value. Fire-and-forget. */
setState(key: string, value: unknown): void { setState(key: string, value: unknown): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "set_state", key, value })); } try { this.lifecycle.send({ type: "set_state", key, value }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
/** Store a memory in the mesh. Returns the assigned id, or null on timeout. */ /** Store a memory in the mesh. Returns the assigned id, or null on timeout. */
async remember(content: string, tags?: string[], timeoutMs = 5_000): Promise<string | null> { async remember(content: string, tags?: string[], timeoutMs = 5_000): Promise<string | null> {
if (this._status !== "open" || !this.ws) return null; if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<string | null>((resolve) => { return new Promise<string | null>((resolve) => {
const reqId = `mr-${++this.reqCounter}`; const reqId = `mr-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.memoryStoreResolvers.delete(reqId)) resolve(null); if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
}, timeoutMs); }, timeoutMs);
this.memoryStoreResolvers.set(reqId, { resolve, timer }); this.memoryStoreResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId })); } try { this.lifecycle!.send({ type: "remember", content, tags, _reqId: reqId }); }
catch { this.memoryStoreResolvers.delete(reqId); clearTimeout(timer); resolve(null); } catch { this.memoryStoreResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
}); });
} }
/** Search memories by relevance. */ /** Search memories by relevance. */
async recall(query: string, timeoutMs = 5_000): Promise<MemoryRow[]> { async recall(query: string, timeoutMs = 5_000): Promise<MemoryRow[]> {
if (this._status !== "open" || !this.ws) return []; if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<MemoryRow[]>((resolve) => { return new Promise<MemoryRow[]>((resolve) => {
const reqId = `mc-${++this.reqCounter}`; const reqId = `mc-${++this.reqCounter}`;
const timer = setTimeout(() => { const timer = setTimeout(() => {
if (this.memoryRecallResolvers.delete(reqId)) resolve([]); if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
}, timeoutMs); }, timeoutMs);
this.memoryRecallResolvers.set(reqId, { resolve, timer }); this.memoryRecallResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId })); } try { this.lifecycle!.send({ type: "recall", query, _reqId: reqId }); }
catch { this.memoryRecallResolvers.delete(reqId); clearTimeout(timer); resolve([]); } catch { this.memoryRecallResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
}); });
} }
/** Forget a memory by id. Fire-and-forget. */ /** Forget a memory by id. Fire-and-forget. */
forget(memoryId: string): void { forget(memoryId: string): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "forget", memoryId })); } try { this.lifecycle.send({ type: "forget", memoryId }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
/** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */ /** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */
setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void { setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "set_profile", ...profile })); } try { this.lifecycle.send({ type: "set_profile", ...profile }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
setSummary(summary: string): void { setSummary(summary: string): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "set_summary", summary })); } try { this.lifecycle.send({ type: "set_summary", summary }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
setStatus(status: "idle" | "working" | "dnd"): void { setStatus(status: "idle" | "working" | "dnd"): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "set_status", status })); } try { this.lifecycle.send({ type: "set_status", status }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
setVisible(visible: boolean): void { setVisible(visible: boolean): void {
if (this._status !== "open" || !this.ws) return; if (this._status !== "open" || !this.lifecycle) return;
try { this.ws.send(JSON.stringify({ type: "set_visible", visible })); } try { this.lifecycle.send({ type: "set_visible", visible }); }
catch { /* ignore */ } catch { /* ignore */ }
} }
async close(): Promise<void> { async close(): Promise<void> {
this.closed = true; this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; }
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
this.failPendingAcks("daemon_shutdown"); this.failPendingAcks("daemon_shutdown");
try { this.ws?.close(); } catch { /* ignore */ } if (this.lifecycle) {
this.setConnStatus("closed"); try { await this.lifecycle.close(); } catch { /* ignore */ }
this.lifecycle = null;
} }
this._status = "closed";
getSessionKeys(): { sessionPubkey: string; sessionSecretKey: string } | null {
if (!this.sessionPubkey || !this.sessionSecretKey) return null;
return { sessionPubkey: this.sessionPubkey, sessionSecretKey: this.sessionSecretKey };
} }
private failPendingAcks(reason: string) { private failPendingAcks(reason: string) {

View File

@@ -870,11 +870,14 @@ async function resolveAndEncrypt(
return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" }; return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" };
} }
// 64-char hex pubkey → DM directly. // 64-char hex pubkey → DM directly. Encrypt with the daemon's member
// secret: recipient decrypts using THEIR session pubkey's matching
// secret on their session-WS, so the sender side just needs any
// private key whose public counterpart is known to the recipient as
// "the sender". Member key is the stable choice and is what the
// recipient already trusts via mesh membership.
if (/^[0-9a-f]{64}$/i.test(to)) { if (/^[0-9a-f]{64}$/i.test(to)) {
const sessionKeys = broker.getSessionKeys(); const env = await encryptDirect(req.message, to, meshSecretKey);
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, to, senderSecret);
return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
} }
@@ -890,9 +893,7 @@ async function resolveAndEncrypt(
if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`); if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`);
if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`); if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`);
const recipient = matches[0]!.pubkey; const recipient = matches[0]!.pubkey;
const sessionKeys = broker.getSessionKeys(); const env = await encryptDirect(req.message, recipient, meshSecretKey);
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
} }
@@ -900,9 +901,7 @@ async function resolveAndEncrypt(
const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase()); const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase());
if (!match) throw new Error(`peer "${to}" not found`); if (!match) throw new Error(`peer "${to}" not found`);
const recipient = match.pubkey; const recipient = match.pubkey;
const sessionKeys = broker.getSessionKeys(); const env = await encryptDirect(req.message, recipient, meshSecretKey);
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" }; return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
} }

View File

@@ -136,13 +136,16 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
bus.publish("broker_status", { mesh: mesh.slug, status: s }); bus.publish("broker_status", { mesh: mesh.slug, status: s });
}, },
onPush: (m) => { onPush: (m) => {
const sessionKeys = broker.getSessionKeys(); // Daemon-WS is member-keyed, not session-keyed. Session-targeted
// DMs land on the per-session WS (SessionBrokerClient) since
// 1.32.1 and decrypt with the session secret there. Anything that
// arrives here can only be member-keyed (broadcasts, member DMs,
// system events) — pass member secret only.
void handleBrokerPush(m, { void handleBrokerPush(m, {
db: inboxDb, db: inboxDb,
bus, bus,
meshSlug: mesh.slug, meshSlug: mesh.slug,
recipientSecretKeyHex: mesh.secretKey, recipientSecretKeyHex: mesh.secretKey,
sessionSecretKeyHex: sessionKeys?.sessionSecretKey,
}); });
}, },
}); });

View File

@@ -26,15 +26,19 @@
* expected to be deployed first. * expected to be deployed first.
* *
* Spec: .artifacts/specs/2026-05-04-per-session-presence.md. * Spec: .artifacts/specs/2026-05-04-per-session-presence.md.
*
* 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) lives
* in `ws-lifecycle.ts`. This class supplies session_hello content and
* routes the inbound onPush; the helper handles the rest.
*/ */
import { hostname as osHostname } from "node:os"; import { hostname as osHostname } from "node:os";
import WebSocket from "ws";
import type { JoinedMesh } from "~/services/config/facade.js"; import type { JoinedMesh } from "~/services/config/facade.js";
import { signSessionHello } from "~/services/broker/session-hello-sig.js"; import { signSessionHello } from "~/services/broker/session-hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type SessionBrokerStatus = "connecting" | "open" | "closed" | "reconnecting"; export type SessionBrokerStatus = WsStatus;
export interface ParentAttestation { export interface ParentAttestation {
sessionPubkey: string; sessionPubkey: string;
@@ -75,16 +79,13 @@ export interface SessionBrokerOptions {
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void; log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
} }
const HELLO_ACK_TIMEOUT_MS = 5_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export class SessionBrokerClient { export class SessionBrokerClient {
private ws: WebSocket | null = null; private lifecycle: WsLifecycle | null = null;
private _status: SessionBrokerStatus = "closed"; private _status: SessionBrokerStatus = "closed";
private closed = false; private closed = false;
private reconnectAttempt = 0; /** Set when the broker rejects session_hello with `unknown_message_type` —
private reconnectTimer: NodeJS.Timeout | null = null; * older brokers without the 1.30.0 surface. We stop retrying. */
private helloTimer: NodeJS.Timeout | null = null; private brokerUnsupported = false;
constructor(private opts: SessionBrokerOptions) {} constructor(private opts: SessionBrokerOptions) {}
@@ -100,31 +101,21 @@ export class SessionBrokerClient {
}); });
}; };
private setStatus(s: SessionBrokerStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run session_hello, resolve once the broker accepts. */ /** Open the WS, run session_hello, resolve once the broker accepts. */
async connect(): Promise<void> { async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed"); if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return; if (this._status === "connecting" || this._status === "open") return;
this.setStatus("connecting");
const ws = new WebSocket(this.opts.mesh.brokerUrl); this.lifecycle = await connectWsWithBackoff({
this.ws = ws; url: this.opts.mesh.brokerUrl,
buildHello: async () => {
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
const { timestamp, signature } = await signSessionHello({ const { timestamp, signature } = await signSessionHello({
meshId: this.opts.mesh.meshId, meshId: this.opts.mesh.meshId,
parentMemberPubkey: this.opts.mesh.pubkey, parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey, sessionPubkey: this.opts.sessionPubkey,
sessionSecretKey: this.opts.sessionSecretKey, sessionSecretKey: this.opts.sessionSecretKey,
}); });
ws.send(JSON.stringify({ return {
type: "session_hello", type: "session_hello",
meshId: this.opts.mesh.meshId, meshId: this.opts.mesh.meshId,
parentMemberId: this.opts.mesh.memberId, parentMemberId: this.opts.mesh.memberId,
@@ -142,38 +133,20 @@ export class SessionBrokerClient {
...(this.opts.role ? { role: this.opts.role } : {}), ...(this.opts.role ? { role: this.opts.role } : {}),
timestamp, timestamp,
signature, signature,
})); };
this.helloTimer = setTimeout(() => { },
this.log("warn", "session_hello_ack_timeout"); isHelloAck: (msg) => msg.type === "hello_ack",
try { ws.close(); } catch { /* ignore */ } onMessage: (msg) => {
reject(new Error("session_hello_ack_timeout"));
}, HELLO_ACK_TIMEOUT_MS);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setStatus("open");
this.reconnectAttempt = 0;
resolve();
return;
}
if (msg.type === "error") { if (msg.type === "error") {
// Older brokers respond with `unknown_message_type` to session_hello; // Older brokers respond with `unknown_message_type` to session_hello;
// surface that so the daemon can decide to skip per-session presence // surface that so the daemon can decide to skip per-session presence
// rather than churn through reconnects. // rather than churn through reconnects. Setting `closed` halts the
// helper's reconnect loop on the next close.
this.log("warn", "broker_error", { code: msg.code, message: msg.message }); this.log("warn", "broker_error", { code: msg.code, message: msg.message });
if (msg.code === "unknown_message_type") { if (msg.code === "unknown_message_type") {
this.brokerUnsupported = true;
this.closed = true; this.closed = true;
void this.lifecycle?.close();
} }
return; return;
} }
@@ -185,33 +158,27 @@ export class SessionBrokerClient {
this.opts.onPush?.(msg); this.opts.onPush?.(msg);
return; return;
} }
}); },
onStatusChange: (s) => {
ws.on("close", (code, reason) => { this._status = s;
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } this.opts.onStatusChange?.(s);
if (this.closed) { this.setStatus("closed"); return; } },
this.setStatus("reconnecting"); log: (level, msg, meta) => this.log(level, `session_broker_${msg}`, meta),
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "session_broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(
() => this.connect().catch((err) => this.log("warn", "session_broker_reconnect_failed", { err: String(err) })),
wait,
);
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
});
ws.on("error", (err) => this.log("warn", "session_broker_ws_error", { err: err.message }));
}); });
} }
async close(): Promise<void> { async close(): Promise<void> {
this.closed = true; this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } if (this.lifecycle) {
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } try { await this.lifecycle.close(); } catch { /* ignore */ }
try { this.ws?.close(); } catch { /* ignore */ } this.lifecycle = null;
this.setStatus("closed");
} }
this._status = "closed";
}
/** True when the broker rejected our session_hello as unknown — caller
* may want to skip per-session presence entirely on this mesh. */
get isBrokerUnsupported(): boolean { return this.brokerUnsupported; }
} }
function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) { function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) {

View File

@@ -0,0 +1,234 @@
/**
* Shared WS lifecycle helper for the daemon's two broker clients.
*
* Both `DaemonBrokerClient` (member-keyed, one per joined mesh) and
* `SessionBrokerClient` (session-keyed, one per launched session) used
* to inline the same connect/hello/ack-timeout/close-reconnect logic.
* They drifted apart subtly — different ack-timeout names, different
* reconnect log messages, slightly different status flips — and that's
* how 1.32.x bugs shipped (push handler attached to the wrong client,
* etc).
*
* This helper owns ONLY the lifecycle:
* - new WebSocket(url), wire up open/message/close/error
* - on open → call buildHello() and send the result
* - start an ack-timeout timer; if it fires before the hello ack
* arrives, close the socket and reject the connect promise
* - on message, gate on isHelloAck(); when true, flip status to
* "open", clear the ack timer, resolve. All other messages are
* forwarded to onMessage()
* - on close, schedule a backoff reconnect (unless explicitly closed)
*
* Each client keeps its own concerns: DaemonBrokerClient still owns
* pendingAcks / peerListResolvers / etc; SessionBrokerClient still owns
* its onPush callback. The helper just hands them an open WS and a
* stable status field, and reconnects under their feet on disconnect.
*
* Composition over inheritance — callers receive a `WsLifecycle` handle
* with `send` / `close` / `status`, NOT a subclass.
*/
import WebSocket from "ws";
export type WsStatus = "connecting" | "open" | "closed" | "reconnecting";
export type WsLogLevel = "info" | "warn" | "error";
export type WsLog = (level: WsLogLevel, msg: string, meta?: Record<string, unknown>) => void;
export interface WsLifecycleOptions {
/** Broker URL (e.g. wss://ic.claudemesh.com/ws). */
url: string;
/**
* Build the hello frame to send right after the WS opens. Async because
* signing the hello may need libsodium initialization. Whatever this
* returns is JSON.stringified and sent verbatim — the helper does NOT
* inspect or modify it.
*/
buildHello: () => Promise<unknown>;
/**
* Returns true iff `msg` is the hello ack the helper should treat as
* "broker accepted us; flip status to open". Both daemon-WS and
* session-WS use `{ type: "hello_ack" }` today, but keeping this a
* predicate lets either client narrow further (e.g. on a `code` field)
* without leaking client-specific shape into the helper.
*/
isHelloAck: (msg: Record<string, unknown>) => boolean;
/**
* Called for every parsed message that is NOT the hello ack. The
* helper does NOT decide which messages are pushes vs RPCs vs errors;
* that's the caller's concern.
*/
onMessage: (msg: Record<string, unknown>) => void;
onStatusChange?: (s: WsStatus) => void;
/**
* How long to wait for the broker's hello ack before giving up and
* forcing a close. Defaults 5s — same as both pre-refactor clients.
*/
helloAckTimeoutMs?: number;
/**
* Reconnect backoff schedule. Defaults [1s, 2s, 4s, 8s, 16s, 30s] —
* matches both pre-refactor clients exactly.
*/
backoffCapsMs?: readonly number[];
log?: WsLog;
/**
* Hook for the close path BEFORE the helper schedules a reconnect.
* Used by DaemonBrokerClient to fail its in-flight pendingAcks map
* with a "broker_disconnected_<code>" reason. The helper passes the
* raw close code so the caller can shape its rejection text.
*
* Returns nothing — close handling continues regardless.
*/
onBeforeReconnect?: (code: number, reason: string) => void;
}
export interface WsLifecycle {
/** Current connection status. Updated synchronously before onStatusChange fires. */
readonly status: WsStatus;
/** Underlying socket. Exposed for callers that need OPEN-state checks
* before sending (mirrors the pre-refactor `this.ws.readyState` checks). */
readonly ws: WebSocket | null;
/** Send a JSON payload over the open WS. Throws if not open — callers
* that need queue-while-disconnected semantics should layer that
* themselves (DaemonBrokerClient does, via its `opens` deferred-fn array). */
send(payload: unknown): void;
/** Close the WS and stop reconnecting. Idempotent. */
close(): Promise<void>;
}
const DEFAULT_HELLO_ACK_TIMEOUT_MS = 5_000;
const DEFAULT_BACKOFF_CAPS_MS: readonly number[] = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
const defaultLog: WsLog = (level, msg, meta) => {
const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() });
if (level === "info") process.stdout.write(line + "\n");
else process.stderr.write(line + "\n");
};
/**
* Connect a WebSocket with hello-handshake, ack-timeout, and reconnect
* with exponential backoff. Resolves once the broker accepts the hello;
* rejects if the first connect closes before the ack lands.
*
* Subsequent automatic reconnects are silent — they fire on the close
* handler's backoff timer and surface only via onStatusChange (and any
* caller-installed log).
*/
export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecycle> {
const helloAckTimeoutMs = opts.helloAckTimeoutMs ?? DEFAULT_HELLO_ACK_TIMEOUT_MS;
const backoffCapsMs = opts.backoffCapsMs ?? DEFAULT_BACKOFF_CAPS_MS;
const log: WsLog = opts.log ?? defaultLog;
let ws: WebSocket | null = null;
let status: WsStatus = "closed";
let closed = false;
let reconnectAttempt = 0;
let reconnectTimer: NodeJS.Timeout | null = null;
let helloTimer: NodeJS.Timeout | null = null;
const setStatus = (s: WsStatus) => {
if (status === s) return;
status = s;
opts.onStatusChange?.(s);
};
/**
* Open one WS attempt. Returns a promise that resolves on hello ack
* or rejects if the socket closes before we get one. Used by both the
* initial connect and the close-handler backoff timer (which awaits
* but ignores the rejection — by then the close handler has already
* scheduled its own reconnect).
*/
const openOnce = (): Promise<void> => {
if (closed) return Promise.reject(new Error("client_closed"));
setStatus("connecting");
const sock = new WebSocket(opts.url);
ws = sock;
return new Promise<void>((resolve, reject) => {
sock.on("open", () => {
// Build and send the hello inside a microtask so any sync
// throws from buildHello() reject this connect attempt cleanly.
(async () => {
try {
const hello = await opts.buildHello();
sock.send(JSON.stringify(hello));
helloTimer = setTimeout(() => {
log("warn", "hello_ack_timeout", { url: opts.url });
try { sock.close(); } catch { /* ignore */ }
reject(new Error("hello_ack_timeout"));
}, helloAckTimeoutMs);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
})();
});
sock.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (opts.isHelloAck(msg)) {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
setStatus("open");
reconnectAttempt = 0;
resolve();
// Don't forward hello_ack to onMessage — both pre-refactor
// clients consumed it inline and never delegated.
return;
}
opts.onMessage(msg);
});
sock.on("close", (code, reason) => {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
const reasonStr = reason.toString("utf8");
opts.onBeforeReconnect?.(code, reasonStr);
if (closed) {
setStatus("closed");
return;
}
setStatus("reconnecting");
const wait = backoffCapsMs[Math.min(reconnectAttempt, backoffCapsMs.length - 1)] ?? 30_000;
reconnectAttempt++;
log("info", "ws_reconnect_scheduled", { url: opts.url, wait_ms: wait, code, reason: reasonStr });
reconnectTimer = setTimeout(
() => openOnce().catch((err) => log("warn", "ws_reconnect_failed", { url: opts.url, err: String(err) })),
wait,
);
// First attempt failure (still in connecting) also rejects the
// initial connect promise so callers can surface it.
if (status === "connecting" || status === "reconnecting") {
reject(new Error(`closed_before_hello_${code}`));
}
});
sock.on("error", (err) => log("warn", "ws_error", { url: opts.url, err: err.message }));
});
};
return openOnce().then(() => {
const handle: WsLifecycle = {
get status() { return status; },
get ws() { return ws; },
send(payload: unknown) {
if (!ws || ws.readyState !== ws.OPEN) {
throw new Error("ws_not_open");
}
ws.send(JSON.stringify(payload));
},
async close() {
closed = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
try { ws?.close(); } catch { /* ignore */ }
setStatus("closed");
},
};
return handle;
});
}