/** * BrokerClient — WebSocket client connecting a CLI session to a claudemesh * broker. Handles: * - hello handshake + ack * - send / ack / push message flow * - auto-reconnect with exponential backoff (1s, 2s, 4s, ..., max 30s) * - in-memory outbound queue while reconnecting * - push buffer so the MCP check_messages tool can drain inbound history * * Encryption is deferred to Step 18 (libsodium). Until then, ciphertext * is plaintext UTF-8, nonce is a random 24-byte base64 string (for * future-compat layout only). */ import WebSocket from "ws"; import { randomBytes } from "node:crypto"; import type { JoinedMesh } from "../state/config"; export type Priority = "now" | "next" | "low"; export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; export interface InboundPush { messageId: string; meshId: string; senderPubkey: string; priority: Priority; nonce: string; ciphertext: string; createdAt: string; receivedAt: string; } type PushHandler = (msg: InboundPush) => void; interface PendingSend { id: string; targetSpec: string; priority: Priority; nonce: string; ciphertext: string; resolve: (v: { ok: boolean; messageId?: string; error?: string }) => void; } const MAX_QUEUED = 100; const HELLO_ACK_TIMEOUT_MS = 5_000; const BACKOFF_CAPS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; export class BrokerClient { private ws: WebSocket | null = null; private _status: ConnStatus = "closed"; private pendingSends = new Map(); private outbound: Array<() => void> = []; // closures that send once ws is open private pushHandlers = new Set(); private pushBuffer: InboundPush[] = []; private closed = false; private reconnectAttempt = 0; private helloTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null; constructor( private mesh: JoinedMesh, private opts: { onStatusChange?: (status: ConnStatus) => void; debug?: boolean; } = {}, ) {} get status(): ConnStatus { return this._status; } get meshId(): string { return this.mesh.meshId; } get meshSlug(): string { return this.mesh.slug; } get pushHistory(): readonly InboundPush[] { return this.pushBuffer; } /** Open WS, send hello, resolve when hello_ack received. */ async connect(): Promise { if (this.closed) throw new Error("client is closed"); this.setStatus("connecting"); const ws = new WebSocket(this.mesh.brokerUrl); this.ws = ws; return new Promise((resolve, reject) => { const onOpen = (): void => { this.debug("ws open → sending hello"); ws.send( JSON.stringify({ type: "hello", meshId: this.mesh.meshId, memberId: this.mesh.memberId, pubkey: this.mesh.pubkey, sessionId: `${process.pid}-${Date.now()}`, pid: process.pid, cwd: process.cwd(), signature: "stub", // libsodium sign_detached lands in Step 18 nonce: randomNonce(), }), ); // Arm the hello_ack timeout. this.helloTimer = setTimeout(() => { this.debug("hello_ack timeout"); ws.close(); reject(new Error("hello_ack timeout")); }, HELLO_ACK_TIMEOUT_MS); }; const onMessage = (raw: WebSocket.RawData): void => { let msg: Record; try { msg = JSON.parse(raw.toString()); } catch { return; } if (msg.type === "hello_ack") { if (this.helloTimer) clearTimeout(this.helloTimer); this.helloTimer = null; this.setStatus("open"); this.reconnectAttempt = 0; this.flushOutbound(); resolve(); return; } this.handleServerMessage(msg); }; const onClose = (): void => { if (this.helloTimer) clearTimeout(this.helloTimer); this.helloTimer = null; this.ws = null; if (this._status !== "open" && this._status !== "reconnecting") { reject(new Error("ws closed before hello_ack")); } if (!this.closed) this.scheduleReconnect(); else this.setStatus("closed"); }; const onError = (err: Error): void => { this.debug(`ws error: ${err.message}`); }; ws.on("open", onOpen); ws.on("message", onMessage); ws.on("close", onClose); ws.on("error", onError); }); } /** Fire-and-wait send: resolves when broker acks. */ async send( targetSpec: string, message: string, priority: Priority = "next", ): Promise<{ ok: boolean; messageId?: string; error?: string }> { const id = randomId(); const nonce = randomNonce(); const ciphertext = Buffer.from(message, "utf-8").toString("base64"); return new Promise((resolve) => { if (this.pendingSends.size >= MAX_QUEUED) { resolve({ ok: false, error: "outbound queue full" }); return; } this.pendingSends.set(id, { id, targetSpec, priority, nonce, ciphertext, resolve, }); const dispatch = (): void => { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; this.ws.send( JSON.stringify({ type: "send", id, targetSpec, priority, nonce, ciphertext, }), ); }; if (this._status === "open") dispatch(); else { // Queue the dispatch closure; flushed on (re)connect. if (this.outbound.length >= MAX_QUEUED) { this.pendingSends.delete(id); resolve({ ok: false, error: "outbound queue full" }); return; } this.outbound.push(dispatch); } // Ack timeout: 10s to hear back. setTimeout(() => { if (this.pendingSends.has(id)) { this.pendingSends.delete(id); resolve({ ok: false, error: "ack timeout" }); } }, 10_000); }); } /** Subscribe to inbound pushes. Returns an unsubscribe function. */ onPush(handler: PushHandler): () => void { this.pushHandlers.add(handler); return () => this.pushHandlers.delete(handler); } /** Drain the buffered push history (used by check_messages tool). */ drainPushBuffer(): InboundPush[] { const drained = this.pushBuffer.slice(); this.pushBuffer.length = 0; return drained; } /** Send a manual status override. Fire-and-forget (no ack). */ async setStatus(status: "idle" | "working" | "dnd"): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; this.ws.send(JSON.stringify({ type: "set_status", status })); } close(): void { this.closed = true; if (this.helloTimer) clearTimeout(this.helloTimer); if (this.reconnectTimer) clearTimeout(this.reconnectTimer); if (this.ws) { try { this.ws.close(); } catch { /* ignore */ } } this.setStatus("closed"); } // --- Internals --- private handleServerMessage(msg: Record): void { if (msg.type === "ack") { const pending = this.pendingSends.get(String(msg.id ?? "")); if (pending) { pending.resolve({ ok: true, messageId: String(msg.messageId ?? ""), }); this.pendingSends.delete(pending.id); } return; } if (msg.type === "push") { const push: InboundPush = { messageId: String(msg.messageId ?? ""), meshId: String(msg.meshId ?? ""), senderPubkey: String(msg.senderPubkey ?? ""), priority: (msg.priority as Priority) ?? "next", nonce: String(msg.nonce ?? ""), ciphertext: String(msg.ciphertext ?? ""), createdAt: String(msg.createdAt ?? ""), receivedAt: new Date().toISOString(), }; this.pushBuffer.push(push); // Cap buffer at 500 entries to avoid unbounded growth. if (this.pushBuffer.length > 500) this.pushBuffer.shift(); for (const h of this.pushHandlers) { try { h(push); } catch { /* handler errors are not the transport's problem */ } } return; } if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null; if (id) { const pending = this.pendingSends.get(id); if (pending) { pending.resolve({ ok: false, error: `${msg.code}: ${msg.message}`, }); this.pendingSends.delete(id); } } } } private flushOutbound(): void { const queued = this.outbound.slice(); this.outbound.length = 0; for (const send of queued) send(); } private scheduleReconnect(): void { this.setStatus("reconnecting"); const delay = BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!; this.reconnectAttempt += 1; this.debug( `reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`, ); this.reconnectTimer = setTimeout(() => { if (this.closed) return; this.connect().catch((e) => { this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`); }); }, delay); } private setStatus(s: ConnStatus): void { if (this._status === s) return; this._status = s; this.opts.onStatusChange?.(s); } private debug(msg: string): void { if (this.opts.debug) console.error(`[broker-client] ${msg}`); } } function randomId(): string { return randomBytes(8).toString("hex"); } function randomNonce(): string { // 24-byte nonce layout (compatible with libsodium crypto_secretbox later) return randomBytes(24).toString("base64"); }