Merge m1-cli-lifecycle-and-role-peer-list into main

Milestone 1 CLI side:
- New apps/cli/src/daemon/ws-lifecycle.ts: connectWsWithBackoff helper
- DaemonBrokerClient + SessionBrokerClient refactored to use the helper
- DaemonBrokerClient: stray sessionPubkey + getSessionKeys() removed
- daemon-WS onPush no longer carries session secret (member-only decrypt)
- IPC send paths now sign with mesh member secret
- peers.ts: filters role==='control-plane' by default; --all opts in;
  JSON output exposes role field

NOTE: a follow-up commit on main renames the wire-level field 'role'
to 'peerRole' to avoid collision with 1.31.5's profile.role lift.
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 18:11:47 +01:00
7 changed files with 575 additions and 354 deletions

View File

@@ -1,5 +1,35 @@
# Changelog
## Unreleased — Milestone 1 lifecycle cleanups
Foundational refactor before the agentic-comms architecture work
(`.artifacts/specs/2026-05-04-agentic-comms-architecture-v2.md`). Three
changes, all behavior-preserving:
- **`connectWsWithBackoff` helper** (`apps/cli/src/daemon/ws-lifecycle.ts`).
Both `DaemonBrokerClient` and `SessionBrokerClient` now share one
lifecycle implementation — connect, hello-handshake, ack-timeout,
close + backoff reconnect. Each client supplies `buildHello` /
`isHelloAck` / `onMessage` and keeps its own RPC bookkeeping
(pendingAcks, peerListResolvers, onPush, etc). Composition over
inheritance per Codex's review; no protocol shape changes.
- **Drop daemon-WS ephemeral session pubkey.** `DaemonBrokerClient` no
longer mints + sends a per-reconnect ephemeral keypair in its hello.
Session-targeted DMs land on `SessionBrokerClient` (since 1.32.1),
not the member-keyed daemon-WS, so the field was vestigial. The
daemon's send-encrypt path now signs DMs with the stable mesh member
secret. Inbound on daemon-WS only attempts member-key decryption —
session decryption is the session-WS's job.
- **Role-aware peer list.** `peer list` now hides peers whose
broker-emitted `role` is `'control-plane'` (the daemon's own
member-keyed presence). `--all` opts back in. JSON output emits
`role` at the top level. Older brokers that don't emit `role` yet
default to `'session'`, so legacy peer rows stay visible without
the broker-side change shipped first. Replaces the prior
`peerType === 'claudemesh-daemon'` channel-name hack.
## 1.32.1 (2026-05-04) — DMs to session pubkeys actually deliver now
Critical fix. Sessions launched via `claudemesh launch` (1.30.0+) hold a

View File

@@ -21,14 +21,28 @@ export interface PeersFlags {
mesh?: string;
/** `true`/`undefined` = full record; comma-separated string = field projection. */
json?: boolean | string;
/** When false (default), hide claudemesh-daemon presence rows from the
* human renderer — they're infrastructure, not interactive peers, and
* confused users into thinking the daemon counted as a "peer". The
* JSON output still includes them so scripts that need a full inventory
* can opt in via --all (or just consume JSON). */
/** When false (default), hide control-plane presence rows from the
* human renderer — they're infrastructure (daemon-WS member-keyed
* presence), not interactive peers, and confused users into thinking
* the daemon counted as a "peer". The JSON output still includes them
* so scripts that need a full inventory can opt in via --all (or
* just consume JSON).
*
* Source of truth is the broker-side `role` field
* (`'control-plane' | 'session' | 'service'`). Older brokers don't
* emit `role` yet — this code falls back to treating missing role as
* `'session'` so legacy peer rows stay visible. */
all?: boolean;
}
/**
* Broker-emitted peer classification, added 2026-05-04. Older brokers
* may omit it — treat missing as 'session' so legacy meshes still
* render their peers (and don't accidentally hide them all). The CLI
* never emits 'control-plane' on its own; that comes from the broker.
*/
export type PeerRole = "control-plane" | "session" | "service";
interface PeerRecord {
pubkey: string;
/** Stable member pubkey (independent of session). When sender shares
@@ -43,10 +57,17 @@ interface PeerRecord {
status?: string;
summary?: string;
groups: Array<{ name: string; role?: string }>;
/** Top-level convenience alias for `profile.role`. Lifted by the
* CLI so JSON consumers see role at the shape's top level instead
* of nested under profile. Same value either way. */
role?: string;
/** Broker-emitted classification: 'control-plane' | 'session' |
* 'service'. Source of truth for the --all visibility filter and the
* default-hide rule. Older brokers omit this; the CLI fills missing
* values with 'session' so legacy peer rows stay visible.
*
* Note: this replaces the prior CLI-side lift of `profile.role` to
* the top-level `role` field — `profile.role` is user-supplied
* metadata (e.g. "lead", "reviewer"), distinct from the broker's
* presence-class taxonomy. The user-facing string still lives at
* `profile.role` and is rendered inline as `role:<value>`. */
role?: PeerRole;
peerType?: string;
channel?: string;
model?: string;
@@ -139,12 +160,9 @@ async function listPeersForMesh(slug: string): Promise<PeerRecord[]> {
* surfaced a sender's siblings as separate rows because they're separate
* presence rows; the cli just hadn't been making that visible.
*
* Also lifts `profile.role` to a top-level `role` field. The broker has
* always returned role nested under `profile.role`, but downstream JSON
* consumers (LLMs in launched sessions, jq pipelines, dashboards) kept
* missing it because nothing pointed at the nesting. A dedicated
* top-level alias makes the intent unmissable without breaking the
* `profile` object's shape for callers that already drill into it.
* Also normalizes the broker's `role` classification: missing values
* (older brokers) default to 'session' so legacy peer rows stay
* visible under the default `--all=false` filter.
*/
function annotateSelf(
peer: PeerRecord,
@@ -161,8 +179,8 @@ function annotateSelf(
selfSessionPubkey &&
peer.pubkey === selfSessionPubkey
);
const role = peer.profile?.role?.trim() || undefined;
return { ...peer, ...(role ? { role } : {}), isSelf, isThisSession };
const role: PeerRole = peer.role ?? "session";
return { ...peer, role, isSelf, isThisSession };
}
export async function runPeers(flags: PeersFlags): Promise<void> {
@@ -208,13 +226,18 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
continue;
}
// Hide claudemesh-daemon rows by default — they're infrastructure
// (the daemon's own member-keyed presence), not interactive peers,
// and they confused users into thinking the daemon counted as a
// Hide control-plane rows by default — they're infrastructure
// (daemon-WS member-keyed presence), not interactive peers, and
// they confused users into thinking the daemon counted as a
// separate peer. --all opts back in for debugging.
//
// Source of truth: broker-emitted `role` field (added 2026-05-04).
// annotateSelf() already filled in 'session' for older brokers
// that don't emit role yet, so this filter is backwards-compatible
// by construction — legacy rows show up.
const visible = flags.all
? peers
: peers.filter((p) => p.channel !== "claudemesh-daemon");
: peers.filter((p) => p.role !== "control-plane");
// Sort: this-session first, then your-other-sessions, then real
// peers. Within each group, idle/working ahead of dnd. Inside the
@@ -226,9 +249,9 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
return score(a) - score(b);
});
const hiddenDaemons = peers.length - visible.length;
const header = hiddenDaemons > 0
? `peers on ${slug} (${sorted.length}, ${hiddenDaemons} daemon hidden — use --all)`
const hiddenControlPlane = peers.length - visible.length;
const header = hiddenControlPlane > 0
? `peers on ${slug} (${sorted.length}, ${hiddenControlPlane} control-plane hidden — use --all)`
: `peers on ${slug} (${sorted.length})`;
render.section(header);

View File

@@ -7,13 +7,19 @@
// - Wire envelope adds `client_message_id` (broker may ignore in legacy
// mode; Sprint 7 promotes it to authoritative dedupe).
// - Reconnect with exponential backoff, signaled to the drain worker.
import WebSocket from "ws";
//
// 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) now
// lives in `ws-lifecycle.ts`. This class supplies the daemon-WS hello
// content and routes incoming RPC replies / pushes; the helper handles
// the rest. The hello no longer carries an ephemeral `sessionPubkey` —
// session-targeted DMs land on the per-session WS (SessionBrokerClient)
// since 1.32.1, so this socket only needs the member identity.
import type { JoinedMesh } from "~/services/config/facade.js";
import { signHello } from "~/services/broker/hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting";
export type ConnStatus = WsStatus;
export interface BrokerSendArgs {
/** Target as the broker expects it: peer name | pubkey | @group | * | topic. */
@@ -49,6 +55,8 @@ export interface PeerSummary {
hostname?: string;
peerType?: string;
channel?: string;
/** Broker-side classification, added 2026-05-04. Missing in older brokers. */
role?: "control-plane" | "session" | "service";
}
interface PendingPeerList {
@@ -84,9 +92,7 @@ export interface MemoryRow {
rememberedAt: string;
}
const HELLO_ACK_TIMEOUT_MS = 5_000;
const SEND_ACK_TIMEOUT_MS = 15_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
const SEND_ACK_TIMEOUT_MS = 15_000;
export interface DaemonBrokerOptions {
displayName?: string;
@@ -96,12 +102,9 @@ export interface DaemonBrokerOptions {
}
export class DaemonBrokerClient {
private ws: WebSocket | null = null;
private lifecycle: WsLifecycle | null = null;
private _status: ConnStatus = "closed";
private closed = false;
private reconnectAttempt = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private helloTimer: NodeJS.Timeout | null = null;
private pendingAcks = new Map<string, PendingAck>();
private peerListResolvers = new Map<string, PendingPeerList>();
private skillListResolvers = new Map<string, { resolve: (rows: SkillSummary[]) => void; timer: NodeJS.Timeout }>();
@@ -110,8 +113,6 @@ export class DaemonBrokerClient {
private stateListResolvers = new Map<string, { resolve: (rows: StateRow[]) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers = new Map<string, { resolve: (rows: MemoryRow[]) => void; timer: NodeJS.Timeout }>();
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private opens: Array<() => void> = [];
private reqCounter = 0;
@@ -125,198 +126,166 @@ export class DaemonBrokerClient {
(this.opts.log ?? defaultLog)(level, msg, { mesh: this.mesh.slug, ...meta });
};
private setConnStatus(s: ConnStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run the hello handshake, resolve once the broker accepts. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return;
this.setConnStatus("connecting");
const ws = new WebSocket(this.mesh.brokerUrl);
this.ws = ws;
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
if (!this.sessionPubkey) {
const { generateKeypair } = await import("~/services/crypto/facade.js");
const kp = await generateKeypair();
this.sessionPubkey = kp.publicKey;
this.sessionSecretKey = kp.secretKey;
}
const { timestamp, signature } = await signHello(
this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey,
);
ws.send(JSON.stringify({
type: "hello",
meshId: this.mesh.meshId,
memberId: this.mesh.memberId,
pubkey: this.mesh.pubkey,
sessionPubkey: this.sessionPubkey,
displayName: this.opts.displayName,
sessionId: `daemon-${process.pid}`,
pid: process.pid,
cwd: process.cwd(),
hostname: require("node:os").hostname(),
peerType: "ai" as const,
channel: "claudemesh-daemon",
timestamp,
signature,
}));
this.helloTimer = setTimeout(() => {
this.log("warn", "broker_hello_ack_timeout");
try { ws.close(); } catch { /* ignore */ }
reject(new Error("hello_ack_timeout"));
}, HELLO_ACK_TIMEOUT_MS);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setConnStatus("open");
this.reconnectAttempt = 0;
// Flush deferred openers (drain worker, etc.)
this.lifecycle = await connectWsWithBackoff({
url: this.mesh.brokerUrl,
buildHello: async () => {
const { timestamp, signature } = await signHello(
this.mesh.meshId, this.mesh.memberId, this.mesh.pubkey, this.mesh.secretKey,
);
return {
type: "hello",
meshId: this.mesh.meshId,
memberId: this.mesh.memberId,
pubkey: this.mesh.pubkey,
// No `sessionPubkey` — daemon-WS is member-keyed only. The
// per-session presence WS (SessionBrokerClient) carries the
// ephemeral session pubkey. Spec §"Layer 1: Identity → Member identity".
displayName: this.opts.displayName,
sessionId: `daemon-${process.pid}`,
pid: process.pid,
cwd: process.cwd(),
hostname: require("node:os").hostname(),
peerType: "ai" as const,
channel: "claudemesh-daemon",
timestamp,
signature,
};
},
isHelloAck: (msg) => msg.type === "hello_ack",
onMessage: (msg) => this.handleMessage(msg),
onStatusChange: (s) => {
this._status = s;
this.opts.onStatusChange?.(s);
if (s === "open") {
// Flush deferred openers (drain worker, etc.).
const queued = this.opens.slice();
this.opens.length = 0;
for (const fn of queued) { try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); } }
resolve();
return;
}
if (msg.type === "ack") {
// Broker shape: { type: "ack", id, messageId, queued, error? }
const id = String(msg.id ?? "");
const ack = this.pendingAcks.get(id);
if (ack) {
this.pendingAcks.delete(id);
clearTimeout(ack.timer);
if (typeof msg.error === "string" && msg.error.length > 0) {
ack.resolve({ ok: false, error: msg.error, permanent: classifyPermanent(msg.error) });
} else {
ack.resolve({ ok: true, messageId: String(msg.messageId ?? id) });
}
for (const fn of queued) {
try { fn(); } catch (e) { this.log("warn", "open_handler_failed", { err: String(e) }); }
}
return;
}
if (msg.type === "peers_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.peerListResolvers.get(reqId);
if (pending) {
this.peerListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.peers) ? (msg.peers as PeerSummary[]) : []);
}
return;
}
if (msg.type === "skill_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.skillListResolvers.get(reqId);
if (pending) {
this.skillListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.skills) ? (msg.skills as SkillSummary[]) : []);
}
return;
}
if (msg.type === "skill_data") {
const reqId = String(msg._reqId ?? "");
const pending = this.skillDataResolvers.get(reqId);
if (pending) {
this.skillDataResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve((msg.skill as SkillFull) ?? null);
}
return;
}
if (msg.type === "state_value" || msg.type === "state_data") {
const reqId = String(msg._reqId ?? "");
const pending = this.stateGetResolvers.get(reqId);
if (pending) {
this.stateGetResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve((msg.state ?? msg.row ?? null) as StateRow | null);
}
return;
}
if (msg.type === "state_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.stateListResolvers.get(reqId);
if (pending) {
this.stateListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.entries) ? (msg.entries as StateRow[]) : []);
}
return;
}
if (msg.type === "memory_stored") {
const reqId = String(msg._reqId ?? "");
const pending = this.memoryStoreResolvers.get(reqId);
if (pending) {
this.memoryStoreResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(typeof msg.memoryId === "string" ? msg.memoryId : null);
}
return;
}
if (msg.type === "memory_recall_result") {
const reqId = String(msg._reqId ?? "");
const pending = this.memoryRecallResolvers.get(reqId);
if (pending) {
this.memoryRecallResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.matches) ? (msg.matches as MemoryRow[]) : []);
}
return;
}
if (msg.type === "push" || msg.type === "inbound") {
this.opts.onPush?.(msg);
return;
}
});
ws.on("close", (code, reason) => {
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
this.failPendingAcks(`broker_disconnected_${code}`);
if (this.closed) { this.setConnStatus("closed"); return; }
this.setConnStatus("reconnecting");
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(() => this.connect().catch((err) => this.log("warn", "broker_reconnect_failed", { err: String(err) })), wait);
// First connection failure also rejects the original connect() promise.
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
});
ws.on("error", (err) => this.log("warn", "broker_ws_error", { err: err.message }));
},
onBeforeReconnect: (code) => this.failPendingAcks(`broker_disconnected_${code}`),
log: (level, msg, meta) => this.log(level, `broker_${msg}`, meta),
});
}
private handleMessage(msg: Record<string, unknown>): void {
if (msg.type === "ack") {
// Broker shape: { type: "ack", id, messageId, queued, error? }
const id = String(msg.id ?? "");
const ack = this.pendingAcks.get(id);
if (ack) {
this.pendingAcks.delete(id);
clearTimeout(ack.timer);
if (typeof msg.error === "string" && msg.error.length > 0) {
ack.resolve({ ok: false, error: msg.error, permanent: classifyPermanent(msg.error) });
} else {
ack.resolve({ ok: true, messageId: String(msg.messageId ?? id) });
}
}
return;
}
if (msg.type === "peers_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.peerListResolvers.get(reqId);
if (pending) {
this.peerListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.peers) ? (msg.peers as PeerSummary[]) : []);
}
return;
}
if (msg.type === "skill_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.skillListResolvers.get(reqId);
if (pending) {
this.skillListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.skills) ? (msg.skills as SkillSummary[]) : []);
}
return;
}
if (msg.type === "skill_data") {
const reqId = String(msg._reqId ?? "");
const pending = this.skillDataResolvers.get(reqId);
if (pending) {
this.skillDataResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve((msg.skill as SkillFull) ?? null);
}
return;
}
if (msg.type === "state_value" || msg.type === "state_data") {
const reqId = String(msg._reqId ?? "");
const pending = this.stateGetResolvers.get(reqId);
if (pending) {
this.stateGetResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve((msg.state ?? msg.row ?? null) as StateRow | null);
}
return;
}
if (msg.type === "state_list") {
const reqId = String(msg._reqId ?? "");
const pending = this.stateListResolvers.get(reqId);
if (pending) {
this.stateListResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.entries) ? (msg.entries as StateRow[]) : []);
}
return;
}
if (msg.type === "memory_stored") {
const reqId = String(msg._reqId ?? "");
const pending = this.memoryStoreResolvers.get(reqId);
if (pending) {
this.memoryStoreResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(typeof msg.memoryId === "string" ? msg.memoryId : null);
}
return;
}
if (msg.type === "memory_recall_result") {
const reqId = String(msg._reqId ?? "");
const pending = this.memoryRecallResolvers.get(reqId);
if (pending) {
this.memoryRecallResolvers.delete(reqId);
clearTimeout(pending.timer);
pending.resolve(Array.isArray(msg.matches) ? (msg.matches as MemoryRow[]) : []);
}
return;
}
if (msg.type === "push" || msg.type === "inbound") {
this.opts.onPush?.(msg);
return;
}
}
/** True when underlying socket is OPEN-ready for direct sends. */
private isOpen(): boolean {
const sock = this.lifecycle?.ws;
return !!sock && sock.readyState === sock.OPEN;
}
/** Send one outbox row. Resolves on broker ack/timeout. */
send(req: BrokerSendArgs): Promise<BrokerSendResult> {
return new Promise<BrokerSendResult>((resolve) => {
const dispatch = () => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) {
if (!this.isOpen()) {
resolve({ ok: false, error: "broker_not_open", permanent: false });
return;
}
@@ -328,7 +297,7 @@ export class DaemonBrokerClient {
}, SEND_ACK_TIMEOUT_MS);
this.pendingAcks.set(id, { resolve, timer });
try {
this.ws.send(JSON.stringify({
this.lifecycle!.send({
type: "send",
id, // legacy correlation id
client_message_id: id, // forward-compat per spec §4.2
@@ -337,7 +306,7 @@ export class DaemonBrokerClient {
priority: req.priority,
nonce: req.nonce,
ciphertext: req.ciphertext,
}));
});
} catch (e) {
this.pendingAcks.delete(id);
clearTimeout(timer);
@@ -352,153 +321,149 @@ export class DaemonBrokerClient {
/** Ask the broker for the current peer list. */
async listPeers(timeoutMs = 5_000): Promise<PeerSummary[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<PeerSummary[]>((resolve) => {
const reqId = `pl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.peerListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.peerListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_peers", _reqId: reqId }); }
catch { this.peerListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** List mesh-published skills. Empty array on disconnect / timeout. */
async listSkills(query?: string, timeoutMs = 5_000): Promise<SkillSummary[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<SkillSummary[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.skillListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.skillListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_skills", query, _reqId: reqId }); }
catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Fetch one skill's full body. Null on not-found / disconnect / timeout. */
async getSkill(name: string, timeoutMs = 5_000): Promise<SkillFull | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<SkillFull | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.skillDataResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.skillDataResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "get_skill", name, _reqId: reqId }); }
catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** Read a single shared state row. Null on disconnect / timeout / not-found. */
async getState(key: string, timeoutMs = 5_000): Promise<StateRow | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<StateRow | null>((resolve) => {
const reqId = `sg-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.stateGetResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.stateGetResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "get_state", key, _reqId: reqId }); }
catch { this.stateGetResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** List all shared state rows in the mesh. */
async listState(timeoutMs = 5_000): Promise<StateRow[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<StateRow[]>((resolve) => {
const reqId = `sl-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.stateListResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.stateListResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId })); }
try { this.lifecycle!.send({ type: "list_state", _reqId: reqId }); }
catch { this.stateListResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Set a shared state value. Fire-and-forget. */
setState(key: string, value: unknown): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_state", key, value })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_state", key, value }); }
catch { /* ignore */ }
}
/** Store a memory in the mesh. Returns the assigned id, or null on timeout. */
async remember(content: string, tags?: string[], timeoutMs = 5_000): Promise<string | null> {
if (this._status !== "open" || !this.ws) return null;
if (this._status !== "open" || !this.lifecycle) return null;
return new Promise<string | null>((resolve) => {
const reqId = `mr-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
}, timeoutMs);
this.memoryStoreResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "remember", content, tags, _reqId: reqId }); }
catch { this.memoryStoreResolvers.delete(reqId); clearTimeout(timer); resolve(null); }
});
}
/** Search memories by relevance. */
async recall(query: string, timeoutMs = 5_000): Promise<MemoryRow[]> {
if (this._status !== "open" || !this.ws) return [];
if (this._status !== "open" || !this.lifecycle) return [];
return new Promise<MemoryRow[]>((resolve) => {
const reqId = `mc-${++this.reqCounter}`;
const timer = setTimeout(() => {
if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
}, timeoutMs);
this.memoryRecallResolvers.set(reqId, { resolve, timer });
try { this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId })); }
try { this.lifecycle!.send({ type: "recall", query, _reqId: reqId }); }
catch { this.memoryRecallResolvers.delete(reqId); clearTimeout(timer); resolve([]); }
});
}
/** Forget a memory by id. Fire-and-forget. */
forget(memoryId: string): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "forget", memoryId })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "forget", memoryId }); }
catch { /* ignore */ }
}
/** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */
setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_profile", ...profile })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_profile", ...profile }); }
catch { /* ignore */ }
}
setSummary(summary: string): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_summary", summary })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_summary", summary }); }
catch { /* ignore */ }
}
setStatus(status: "idle" | "working" | "dnd"): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_status", status })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_status", status }); }
catch { /* ignore */ }
}
setVisible(visible: boolean): void {
if (this._status !== "open" || !this.ws) return;
try { this.ws.send(JSON.stringify({ type: "set_visible", visible })); }
if (this._status !== "open" || !this.lifecycle) return;
try { this.lifecycle.send({ type: "set_visible", visible }); }
catch { /* ignore */ }
}
async close(): Promise<void> {
this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; }
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
this.failPendingAcks("daemon_shutdown");
try { this.ws?.close(); } catch { /* ignore */ }
this.setConnStatus("closed");
}
getSessionKeys(): { sessionPubkey: string; sessionSecretKey: string } | null {
if (!this.sessionPubkey || !this.sessionSecretKey) return null;
return { sessionPubkey: this.sessionPubkey, sessionSecretKey: this.sessionSecretKey };
if (this.lifecycle) {
try { await this.lifecycle.close(); } catch { /* ignore */ }
this.lifecycle = null;
}
this._status = "closed";
}
private failPendingAcks(reason: string) {

View File

@@ -870,11 +870,14 @@ async function resolveAndEncrypt(
return { target_spec: to, ciphertext, nonce, mesh: meshSlug ?? "" };
}
// 64-char hex pubkey → DM directly.
// 64-char hex pubkey → DM directly. Encrypt with the daemon's member
// secret: recipient decrypts using THEIR session pubkey's matching
// secret on their session-WS, so the sender side just needs any
// private key whose public counterpart is known to the recipient as
// "the sender". Member key is the stable choice and is what the
// recipient already trusts via mesh membership.
if (/^[0-9a-f]{64}$/i.test(to)) {
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, to, senderSecret);
const env = await encryptDirect(req.message, to, meshSecretKey);
return { target_spec: to, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
@@ -890,9 +893,7 @@ async function resolveAndEncrypt(
if (matches.length === 0) throw new Error(`no peer matching prefix "${to}"`);
if (matches.length > 1) throw new Error(`prefix "${to}" is ambiguous (${matches.length} matches)`);
const recipient = matches[0]!.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
const env = await encryptDirect(req.message, recipient, meshSecretKey);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}
@@ -900,9 +901,7 @@ async function resolveAndEncrypt(
const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase());
if (!match) throw new Error(`peer "${to}" not found`);
const recipient = match.pubkey;
const sessionKeys = broker.getSessionKeys();
const senderSecret = sessionKeys?.sessionSecretKey ?? meshSecretKey;
const env = await encryptDirect(req.message, recipient, senderSecret);
const env = await encryptDirect(req.message, recipient, meshSecretKey);
return { target_spec: recipient, ciphertext: env.ciphertext, nonce: env.nonce, mesh: meshSlug ?? "" };
}

View File

@@ -136,13 +136,16 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
bus.publish("broker_status", { mesh: mesh.slug, status: s });
},
onPush: (m) => {
const sessionKeys = broker.getSessionKeys();
// Daemon-WS is member-keyed, not session-keyed. Session-targeted
// DMs land on the per-session WS (SessionBrokerClient) since
// 1.32.1 and decrypt with the session secret there. Anything that
// arrives here can only be member-keyed (broadcasts, member DMs,
// system events) — pass member secret only.
void handleBrokerPush(m, {
db: inboxDb,
bus,
meshSlug: mesh.slug,
recipientSecretKeyHex: mesh.secretKey,
sessionSecretKeyHex: sessionKeys?.sessionSecretKey,
});
},
});

View File

@@ -26,15 +26,19 @@
* expected to be deployed first.
*
* Spec: .artifacts/specs/2026-05-04-per-session-presence.md.
*
* 2026-05-04: lifecycle (connect / hello-ack / close-reconnect) lives
* in `ws-lifecycle.ts`. This class supplies session_hello content and
* routes the inbound onPush; the helper handles the rest.
*/
import { hostname as osHostname } from "node:os";
import WebSocket from "ws";
import type { JoinedMesh } from "~/services/config/facade.js";
import { signSessionHello } from "~/services/broker/session-hello-sig.js";
import { connectWsWithBackoff, type WsLifecycle, type WsStatus } from "./ws-lifecycle.js";
export type SessionBrokerStatus = "connecting" | "open" | "closed" | "reconnecting";
export type SessionBrokerStatus = WsStatus;
export interface ParentAttestation {
sessionPubkey: string;
@@ -75,16 +79,13 @@ export interface SessionBrokerOptions {
log?: (level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) => void;
}
const HELLO_ACK_TIMEOUT_MS = 5_000;
const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export class SessionBrokerClient {
private ws: WebSocket | null = null;
private lifecycle: WsLifecycle | null = null;
private _status: SessionBrokerStatus = "closed";
private closed = false;
private reconnectAttempt = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private helloTimer: NodeJS.Timeout | null = null;
/** Set when the broker rejects session_hello with `unknown_message_type` —
* older brokers without the 1.30.0 surface. We stop retrying. */
private brokerUnsupported = false;
constructor(private opts: SessionBrokerOptions) {}
@@ -100,80 +101,52 @@ export class SessionBrokerClient {
});
};
private setStatus(s: SessionBrokerStatus) {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
/** Open the WS, run session_hello, resolve once the broker accepts. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client_closed");
if (this._status === "connecting" || this._status === "open") return;
this.setStatus("connecting");
const ws = new WebSocket(this.opts.mesh.brokerUrl);
this.ws = ws;
return new Promise<void>((resolve, reject) => {
ws.on("open", async () => {
try {
const { timestamp, signature } = await signSessionHello({
meshId: this.opts.mesh.meshId,
parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey,
sessionSecretKey: this.opts.sessionSecretKey,
});
ws.send(JSON.stringify({
type: "session_hello",
meshId: this.opts.mesh.meshId,
parentMemberId: this.opts.mesh.memberId,
parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey,
parentAttestation: this.opts.parentAttestation,
displayName: this.opts.displayName,
sessionId: this.opts.sessionId,
pid: this.opts.pid,
cwd: this.opts.cwd ?? process.cwd(),
hostname: osHostname(),
peerType: "ai" as const,
channel: "claudemesh-session",
...(this.opts.groups && this.opts.groups.length > 0 ? { groups: this.opts.groups } : {}),
...(this.opts.role ? { role: this.opts.role } : {}),
timestamp,
signature,
}));
this.helloTimer = setTimeout(() => {
this.log("warn", "session_hello_ack_timeout");
try { ws.close(); } catch { /* ignore */ }
reject(new Error("session_hello_ack_timeout"));
}, HELLO_ACK_TIMEOUT_MS);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
});
ws.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setStatus("open");
this.reconnectAttempt = 0;
resolve();
return;
}
this.lifecycle = await connectWsWithBackoff({
url: this.opts.mesh.brokerUrl,
buildHello: async () => {
const { timestamp, signature } = await signSessionHello({
meshId: this.opts.mesh.meshId,
parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey,
sessionSecretKey: this.opts.sessionSecretKey,
});
return {
type: "session_hello",
meshId: this.opts.mesh.meshId,
parentMemberId: this.opts.mesh.memberId,
parentMemberPubkey: this.opts.mesh.pubkey,
sessionPubkey: this.opts.sessionPubkey,
parentAttestation: this.opts.parentAttestation,
displayName: this.opts.displayName,
sessionId: this.opts.sessionId,
pid: this.opts.pid,
cwd: this.opts.cwd ?? process.cwd(),
hostname: osHostname(),
peerType: "ai" as const,
channel: "claudemesh-session",
...(this.opts.groups && this.opts.groups.length > 0 ? { groups: this.opts.groups } : {}),
...(this.opts.role ? { role: this.opts.role } : {}),
timestamp,
signature,
};
},
isHelloAck: (msg) => msg.type === "hello_ack",
onMessage: (msg) => {
if (msg.type === "error") {
// Older brokers respond with `unknown_message_type` to session_hello;
// surface that so the daemon can decide to skip per-session presence
// rather than churn through reconnects.
// rather than churn through reconnects. Setting `closed` halts the
// helper's reconnect loop on the next close.
this.log("warn", "broker_error", { code: msg.code, message: msg.message });
if (msg.code === "unknown_message_type") {
this.brokerUnsupported = true;
this.closed = true;
void this.lifecycle?.close();
}
return;
}
@@ -185,33 +158,27 @@ export class SessionBrokerClient {
this.opts.onPush?.(msg);
return;
}
});
ws.on("close", (code, reason) => {
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
if (this.closed) { this.setStatus("closed"); return; }
this.setStatus("reconnecting");
const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000;
this.reconnectAttempt++;
this.log("info", "session_broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") });
this.reconnectTimer = setTimeout(
() => this.connect().catch((err) => this.log("warn", "session_broker_reconnect_failed", { err: String(err) })),
wait,
);
if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`));
});
ws.on("error", (err) => this.log("warn", "session_broker_ws_error", { err: err.message }));
},
onStatusChange: (s) => {
this._status = s;
this.opts.onStatusChange?.(s);
},
log: (level, msg, meta) => this.log(level, `session_broker_${msg}`, meta),
});
}
async close(): Promise<void> {
this.closed = true;
if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; }
if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; }
try { this.ws?.close(); } catch { /* ignore */ }
this.setStatus("closed");
if (this.lifecycle) {
try { await this.lifecycle.close(); } catch { /* ignore */ }
this.lifecycle = null;
}
this._status = "closed";
}
/** True when the broker rejected our session_hello as unknown — caller
* may want to skip per-session presence entirely on this mesh. */
get isBrokerUnsupported(): boolean { return this.brokerUnsupported; }
}
function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record<string, unknown>) {

View File

@@ -0,0 +1,234 @@
/**
* Shared WS lifecycle helper for the daemon's two broker clients.
*
* Both `DaemonBrokerClient` (member-keyed, one per joined mesh) and
* `SessionBrokerClient` (session-keyed, one per launched session) used
* to inline the same connect/hello/ack-timeout/close-reconnect logic.
* They drifted apart subtly — different ack-timeout names, different
* reconnect log messages, slightly different status flips — and that's
* how 1.32.x bugs shipped (push handler attached to the wrong client,
* etc).
*
* This helper owns ONLY the lifecycle:
* - new WebSocket(url), wire up open/message/close/error
* - on open → call buildHello() and send the result
* - start an ack-timeout timer; if it fires before the hello ack
* arrives, close the socket and reject the connect promise
* - on message, gate on isHelloAck(); when true, flip status to
* "open", clear the ack timer, resolve. All other messages are
* forwarded to onMessage()
* - on close, schedule a backoff reconnect (unless explicitly closed)
*
* Each client keeps its own concerns: DaemonBrokerClient still owns
* pendingAcks / peerListResolvers / etc; SessionBrokerClient still owns
* its onPush callback. The helper just hands them an open WS and a
* stable status field, and reconnects under their feet on disconnect.
*
* Composition over inheritance — callers receive a `WsLifecycle` handle
* with `send` / `close` / `status`, NOT a subclass.
*/
import WebSocket from "ws";
export type WsStatus = "connecting" | "open" | "closed" | "reconnecting";
export type WsLogLevel = "info" | "warn" | "error";
export type WsLog = (level: WsLogLevel, msg: string, meta?: Record<string, unknown>) => void;
export interface WsLifecycleOptions {
/** Broker URL (e.g. wss://ic.claudemesh.com/ws). */
url: string;
/**
* Build the hello frame to send right after the WS opens. Async because
* signing the hello may need libsodium initialization. Whatever this
* returns is JSON.stringified and sent verbatim — the helper does NOT
* inspect or modify it.
*/
buildHello: () => Promise<unknown>;
/**
* Returns true iff `msg` is the hello ack the helper should treat as
* "broker accepted us; flip status to open". Both daemon-WS and
* session-WS use `{ type: "hello_ack" }` today, but keeping this a
* predicate lets either client narrow further (e.g. on a `code` field)
* without leaking client-specific shape into the helper.
*/
isHelloAck: (msg: Record<string, unknown>) => boolean;
/**
* Called for every parsed message that is NOT the hello ack. The
* helper does NOT decide which messages are pushes vs RPCs vs errors;
* that's the caller's concern.
*/
onMessage: (msg: Record<string, unknown>) => void;
onStatusChange?: (s: WsStatus) => void;
/**
* How long to wait for the broker's hello ack before giving up and
* forcing a close. Defaults 5s — same as both pre-refactor clients.
*/
helloAckTimeoutMs?: number;
/**
* Reconnect backoff schedule. Defaults [1s, 2s, 4s, 8s, 16s, 30s] —
* matches both pre-refactor clients exactly.
*/
backoffCapsMs?: readonly number[];
log?: WsLog;
/**
* Hook for the close path BEFORE the helper schedules a reconnect.
* Used by DaemonBrokerClient to fail its in-flight pendingAcks map
* with a "broker_disconnected_<code>" reason. The helper passes the
* raw close code so the caller can shape its rejection text.
*
* Returns nothing — close handling continues regardless.
*/
onBeforeReconnect?: (code: number, reason: string) => void;
}
export interface WsLifecycle {
/** Current connection status. Updated synchronously before onStatusChange fires. */
readonly status: WsStatus;
/** Underlying socket. Exposed for callers that need OPEN-state checks
* before sending (mirrors the pre-refactor `this.ws.readyState` checks). */
readonly ws: WebSocket | null;
/** Send a JSON payload over the open WS. Throws if not open — callers
* that need queue-while-disconnected semantics should layer that
* themselves (DaemonBrokerClient does, via its `opens` deferred-fn array). */
send(payload: unknown): void;
/** Close the WS and stop reconnecting. Idempotent. */
close(): Promise<void>;
}
const DEFAULT_HELLO_ACK_TIMEOUT_MS = 5_000;
const DEFAULT_BACKOFF_CAPS_MS: readonly number[] = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
const defaultLog: WsLog = (level, msg, meta) => {
const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() });
if (level === "info") process.stdout.write(line + "\n");
else process.stderr.write(line + "\n");
};
/**
* Connect a WebSocket with hello-handshake, ack-timeout, and reconnect
* with exponential backoff. Resolves once the broker accepts the hello;
* rejects if the first connect closes before the ack lands.
*
* Subsequent automatic reconnects are silent — they fire on the close
* handler's backoff timer and surface only via onStatusChange (and any
* caller-installed log).
*/
export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecycle> {
const helloAckTimeoutMs = opts.helloAckTimeoutMs ?? DEFAULT_HELLO_ACK_TIMEOUT_MS;
const backoffCapsMs = opts.backoffCapsMs ?? DEFAULT_BACKOFF_CAPS_MS;
const log: WsLog = opts.log ?? defaultLog;
let ws: WebSocket | null = null;
let status: WsStatus = "closed";
let closed = false;
let reconnectAttempt = 0;
let reconnectTimer: NodeJS.Timeout | null = null;
let helloTimer: NodeJS.Timeout | null = null;
const setStatus = (s: WsStatus) => {
if (status === s) return;
status = s;
opts.onStatusChange?.(s);
};
/**
* Open one WS attempt. Returns a promise that resolves on hello ack
* or rejects if the socket closes before we get one. Used by both the
* initial connect and the close-handler backoff timer (which awaits
* but ignores the rejection — by then the close handler has already
* scheduled its own reconnect).
*/
const openOnce = (): Promise<void> => {
if (closed) return Promise.reject(new Error("client_closed"));
setStatus("connecting");
const sock = new WebSocket(opts.url);
ws = sock;
return new Promise<void>((resolve, reject) => {
sock.on("open", () => {
// Build and send the hello inside a microtask so any sync
// throws from buildHello() reject this connect attempt cleanly.
(async () => {
try {
const hello = await opts.buildHello();
sock.send(JSON.stringify(hello));
helloTimer = setTimeout(() => {
log("warn", "hello_ack_timeout", { url: opts.url });
try { sock.close(); } catch { /* ignore */ }
reject(new Error("hello_ack_timeout"));
}, helloAckTimeoutMs);
} catch (e) {
reject(e instanceof Error ? e : new Error(String(e)));
}
})();
});
sock.on("message", (raw) => {
let msg: Record<string, unknown>;
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
catch { return; }
if (opts.isHelloAck(msg)) {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
setStatus("open");
reconnectAttempt = 0;
resolve();
// Don't forward hello_ack to onMessage — both pre-refactor
// clients consumed it inline and never delegated.
return;
}
opts.onMessage(msg);
});
sock.on("close", (code, reason) => {
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
const reasonStr = reason.toString("utf8");
opts.onBeforeReconnect?.(code, reasonStr);
if (closed) {
setStatus("closed");
return;
}
setStatus("reconnecting");
const wait = backoffCapsMs[Math.min(reconnectAttempt, backoffCapsMs.length - 1)] ?? 30_000;
reconnectAttempt++;
log("info", "ws_reconnect_scheduled", { url: opts.url, wait_ms: wait, code, reason: reasonStr });
reconnectTimer = setTimeout(
() => openOnce().catch((err) => log("warn", "ws_reconnect_failed", { url: opts.url, err: String(err) })),
wait,
);
// First attempt failure (still in connecting) also rejects the
// initial connect promise so callers can surface it.
if (status === "connecting" || status === "reconnecting") {
reject(new Error(`closed_before_hello_${code}`));
}
});
sock.on("error", (err) => log("warn", "ws_error", { url: opts.url, err: err.message }));
});
};
return openOnce().then(() => {
const handle: WsLifecycle = {
get status() { return status; },
get ws() { return ws; },
send(payload: unknown) {
if (!ws || ws.readyState !== ws.OPEN) {
throw new Error("ws_not_open");
}
ws.send(JSON.stringify(payload));
},
async close() {
closed = true;
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
try { ws?.close(); } catch { /* ignore */ }
setStatus("closed");
},
};
return handle;
});
}