diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 4f68368..4ba201f 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -267,7 +267,7 @@ function sendError( async function handleHello( ws: WebSocket, hello: Extract, -): Promise { +): Promise<{ presenceId: string; memberDisplayName: string } | null> { // Capacity check BEFORE touching DB. const existing = connectionsPerMesh.get(hello.meshId) ?? 0; if (existing >= env.MAX_CONNECTIONS_PER_MESH) { @@ -308,8 +308,12 @@ async function handleHello( presence_id: presenceId, session_id: hello.sessionId, }); - await maybePushQueuedMessages(presenceId); - return presenceId; + // Drain any queued messages in the background. The hello_ack is + // sent by the CALLER after it assigns presenceId — sending it here + // races the caller's closure assignment, causing subsequent client + // messages to fail the "no_hello" check. + void maybePushQueuedMessages(presenceId); + return { presenceId, memberDisplayName: member.displayName }; } async function handleSend( @@ -348,7 +352,22 @@ function handleConnection(ws: WebSocket): void { try { const msg = JSON.parse(raw.toString()) as WSClientMessage; if (msg.type === "hello") { - presenceId = await handleHello(ws, msg); + const result = await handleHello(ws, msg); + if (!result) return; + presenceId = result.presenceId; + // Ack AFTER closure assignment — subsequent client messages + // arriving immediately after will now see a non-null presenceId. + try { + ws.send( + JSON.stringify({ + type: "hello_ack", + presenceId: result.presenceId, + memberDisplayName: result.memberDisplayName, + }), + ); + } catch { + /* ws closed during hello */ + } return; } if (!presenceId) { diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index ce90585..ccac062 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -95,6 +95,13 @@ export interface WSAckMessage { queued: boolean; } +/** Broker → client: hello handshake acknowledgement. */ +export interface WSHelloAckMessage { + type: "hello_ack"; + presenceId: string; + memberDisplayName: string; +} + /** Broker → client: structured error. */ export interface WSErrorMessage { type: "error"; @@ -108,4 +115,8 @@ export type WSClientMessage = | WSSendMessage | WSSetStatusMessage; -export type WSServerMessage = WSPushMessage | WSAckMessage | WSErrorMessage; +export type WSServerMessage = + | WSHelloAckMessage + | WSPushMessage + | WSAckMessage + | WSErrorMessage; diff --git a/apps/cli/scripts/roundtrip.ts b/apps/cli/scripts/roundtrip.ts new file mode 100644 index 0000000..3ecefd9 --- /dev/null +++ b/apps/cli/scripts/roundtrip.ts @@ -0,0 +1,81 @@ +#!/usr/bin/env bun +/** + * End-to-end round-trip: two BrokerClient instances talking via the + * broker. Runs against a live broker + seeded DB. + * + * Reads /tmp/cli-seed.json (output of broker's scripts/seed-test-mesh.ts), + * connects peer A and peer B, sends a message from A to B, waits for + * the push on B, asserts receipt + sender pubkey. + */ + +import { readFileSync } from "node:fs"; +import { BrokerClient } from "../src/ws/client"; +import type { JoinedMesh } from "../src/state/config"; + +const seed = JSON.parse(readFileSync("/tmp/cli-seed.json", "utf-8")) as { + meshId: string; + peerA: { memberId: string; pubkey: string }; + peerB: { memberId: string; pubkey: string }; +}; + +const brokerUrl = process.env.BROKER_WS_URL ?? "ws://localhost:7900/ws"; +const meshA: JoinedMesh = { + meshId: seed.meshId, + memberId: seed.peerA.memberId, + slug: "rt-a", + name: "roundtrip-a", + pubkey: seed.peerA.pubkey, + secretKey: "stub", + brokerUrl, + joinedAt: new Date().toISOString(), +}; +const meshB: JoinedMesh = { ...meshA, memberId: seed.peerB.memberId, slug: "rt-b", pubkey: seed.peerB.pubkey }; + +async function main(): Promise { + const a = new BrokerClient(meshA, { debug: true }); + const b = new BrokerClient(meshB, { debug: true }); + + let received: string | null = null; + let receivedSender: string | null = null; + b.onPush((msg) => { + received = Buffer.from(msg.ciphertext, "base64").toString("utf-8"); + receivedSender = msg.senderPubkey; + console.log(`[b] push: "${received}" from ${receivedSender}`); + }); + + console.log("[rt] connecting A + B…"); + await Promise.all([a.connect(), b.connect()]); + console.log(`[rt] A: ${a.status}, B: ${b.status}`); + + console.log("[rt] A → B …"); + const result = await a.send(seed.peerB.pubkey, "hello from A", "now"); + console.log("[rt] send result:", result); + + // Wait up to 3s for the push to land. + for (let i = 0; i < 30 && !received; i++) { + await new Promise((r) => setTimeout(r, 100)); + } + + a.close(); + b.close(); + + if (!received) { + console.error("✗ FAIL: no push received"); + process.exit(1); + } + if (received !== "hello from A") { + console.error(`✗ FAIL: body mismatch: "${received}"`); + process.exit(1); + } + if (receivedSender !== seed.peerA.pubkey) { + console.error(`✗ FAIL: sender mismatch: "${receivedSender}"`); + process.exit(1); + } + console.log("✓ round-trip PASSED"); + process.exit(0); +} + +main().catch((e) => { + console.error("✗ FAIL:", e instanceof Error ? e.message : e); + process.exit(1); +}); diff --git a/apps/cli/src/commands/seed-test-mesh.ts b/apps/cli/src/commands/seed-test-mesh.ts new file mode 100644 index 0000000..b4dc7d6 --- /dev/null +++ b/apps/cli/src/commands/seed-test-mesh.ts @@ -0,0 +1,44 @@ +/** + * `claudemesh seed-test-mesh` — dev-only helper for 15b testing. + * + * Writes a locally-valid JoinedMesh entry to ~/.claudemesh/config.json + * so the MCP server can connect to a locally-running broker without + * invite-link / crypto plumbing. + * + * Usage: + * claudemesh seed-test-mesh + */ + +import { loadConfig, saveConfig } from "../state/config"; + +export function runSeedTestMesh(args: string[]): void { + const [brokerUrl, meshId, memberId, pubkey, slug] = args; + if (!brokerUrl || !meshId || !memberId || !pubkey || !slug) { + console.error( + "Usage: claudemesh seed-test-mesh ", + ); + console.error(""); + console.error( + 'Example: claudemesh seed-test-mesh "ws://localhost:7900/ws" mesh-123 member-abc aaa..aaa smoke-test', + ); + process.exit(1); + } + const config = loadConfig(); + // Remove any prior entry with same slug (idempotent). + config.meshes = config.meshes.filter((m) => m.slug !== slug); + config.meshes.push({ + meshId, + memberId, + slug, + name: `Test: ${slug}`, + pubkey, + secretKey: "dev-only-stub", // real keypair generated during join in Step 17 + brokerUrl, + joinedAt: new Date().toISOString(), + }); + saveConfig(config); + console.log(`Seeded mesh "${slug}" (${meshId}) into local config.`); + console.log( + `Run \`claudemesh mcp\` to connect, or register with Claude Code via \`claudemesh install\`.`, + ); +} diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 992ede1..e47757b 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -14,6 +14,7 @@ import { runInstall } from "./commands/install"; import { runJoin } from "./commands/join"; import { runList } from "./commands/list"; import { runLeave } from "./commands/leave"; +import { runSeedTestMesh } from "./commands/seed-test-mesh"; const HELP = `claudemesh — peer mesh for Claude Code sessions @@ -25,6 +26,7 @@ Commands: join Join a mesh via invite link (ic://join/...) list Show all joined meshes leave Leave a joined mesh + seed-test-mesh Dev-only: inject a mesh into config (skips invite flow) mcp Start MCP server (stdio) — invoked by Claude Code --help, -h Show this help @@ -54,6 +56,9 @@ async function main(): Promise { case "leave": runLeave(args); return; + case "seed-test-mesh": + runSeedTestMesh(args); + return; case "--help": case "-h": case "help": diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 2359344..70b263a 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -1,10 +1,12 @@ /** * MCP server (stdio transport) for @claudemesh/cli. * - * Invoked by Claude Code as a stdio subprocess. Exposes the 5 tools - * in tools.ts. In this 15a scaffold, all tools return a "not - * connected" response; 15b will wire them to a live WS broker - * connection. + * Starts BrokerClient connections for every mesh in config on boot, + * then routes the 5 MCP tools through them. + * + * list_peers is stubbed at the CLI level — the broker's WS protocol + * does not yet carry a list-peers request type (Step 16). Until then, + * it returns a note. */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; @@ -15,39 +17,87 @@ import { } from "@modelcontextprotocol/sdk/types.js"; import { TOOLS } from "./tools"; import { loadConfig } from "../state/config"; +import { startClients, stopAll, findClient, allClients } from "../ws/manager"; +import type { + Priority, + PeerStatus, + SendMessageArgs, + SetStatusArgs, + SetSummaryArgs, + ListPeersArgs, +} from "./types"; +import type { BrokerClient, InboundPush } from "../ws/client"; -const NOT_CONNECTED = { - content: [ - { - type: "text" as const, - text: "claudemesh: not yet connected to broker. Run `claudemesh join ` to join a mesh, then restart your Claude Code session. (Broker client wiring lands in Step 15b — scaffold only for now.)", - }, - ], - isError: true, -}; +function text(msg: string, isError = false) { + return { + content: [{ type: "text" as const, text: msg }], + ...(isError ? { isError: true } : {}), + }; +} -const INSTRUCTIONS = `You are connected to a claudemesh — a peer-to-peer network of other Claude Code sessions. +/** + * Given a `to` string, pick which mesh to send from. Strategies: + * - If `to` looks like a pubkey hex (64 chars), try every client; + * caller is expected to know which mesh the pubkey lives in. + * - If `to` starts with `#`, treat as channel on the first mesh. + * - Otherwise try to match a displayName (TODO — needs list_peers). + * + * For now the MVP: if only one mesh is joined, use that. Otherwise + * require the caller to prefix with `:`. + */ +function resolveClient(to: string): { + client: BrokerClient | null; + targetSpec: string; + error?: string; +} { + const clients = allClients(); + if (clients.length === 0) { + return { client: null, targetSpec: to, error: "no meshes joined" }; + } + // Explicit mesh prefix: "mesh-slug:targetspec" + const colonIdx = to.indexOf(":"); + if (colonIdx > 0 && colonIdx < to.length - 1) { + const slug = to.slice(0, colonIdx); + const rest = to.slice(colonIdx + 1); + const match = findClient(slug); + if (match) return { client: match, targetSpec: rest }; + } + // Single-mesh fast path. + if (clients.length === 1) { + return { client: clients[0]!, targetSpec: to }; + } + return { + client: null, + targetSpec: to, + error: `multiple meshes joined; prefix target with ":" (joined: ${clients.map((c) => c.meshSlug).join(", ")})`, + }; +} -Use these tools to coordinate with peers on demand. Each mesh is a trust boundary; messages are E2E-encrypted and routed through a shared broker. - -Available tools: -- send_message: send a direct or channel message -- list_peers: see who else is in your meshes and their status -- check_messages: pull undelivered messages (normally pushed automatically) -- set_summary: describe what you're working on (visible to peers) -- set_status: manually override your presence (idle/working/dnd) - -When you receive an inbound message (channel notification), respond promptly — like answering a knock on the door. The sender is waiting on you.`; +function formatPush(p: InboundPush, meshSlug: string): string { + const body = (() => { + try { + return Buffer.from(p.ciphertext, "base64").toString("utf-8"); + } catch { + return "(invalid base64 ciphertext)"; + } + })(); + return `[${meshSlug}] from ${p.senderPubkey.slice(0, 12)}… (${p.priority}, ${p.createdAt}):\n${body}`; +} export async function startMcpServer(): Promise { - // Load config so we know which meshes the user has joined. const config = loadConfig(); const server = new Server( { name: "claudemesh", version: "0.1.0" }, { capabilities: { tools: {} }, - instructions: INSTRUCTIONS, + instructions: `You are connected to claudemesh — a peer mesh for Claude Code sessions. + +Use these tools to coordinate with peers on demand. Respond promptly when you receive messages (they're like someone tapping your shoulder). + +Tools: send_message, list_peers, check_messages, set_summary, set_status. + +If you have multiple joined meshes, prefix the \`to\` argument of send_message with \`:\` to disambiguate. Otherwise claudemesh picks the single joined mesh.`, }, ); @@ -56,31 +106,101 @@ export async function startMcpServer(): Promise { })); server.setRequestHandler(CallToolRequestSchema, async (req) => { - const { name } = req.params; - // Stubs: all tools return "not connected" until 15b. + const { name, arguments: args } = req.params; if (config.meshes.length === 0) { - return { - content: [ - { - type: "text" as const, - text: `claudemesh: no meshes joined yet. Run \`claudemesh join \` to join one.`, - }, - ], - isError: true, - }; + return text( + "No meshes joined. Run `claudemesh join ` first.", + true, + ); } + switch (name) { - case "send_message": - case "list_peers": - case "check_messages": - case "set_summary": - case "set_status": - return NOT_CONNECTED; + case "send_message": { + const { to, message, priority } = (args ?? {}) as SendMessageArgs; + if (!to || !message) + return text("send_message: `to` and `message` required", true); + const { client, targetSpec, error } = resolveClient(to); + if (!client) + return text(`send_message: ${error ?? "no client resolved"}`, true); + const result = await client.send( + targetSpec, + message, + (priority ?? "next") as Priority, + ); + if (!result.ok) + return text( + `send_message failed (${client.meshSlug}): ${result.error}`, + true, + ); + return text( + `Sent to ${targetSpec} via ${client.meshSlug} [${priority ?? "next"}] → ${result.messageId}`, + ); + } + + case "list_peers": { + const { mesh_slug } = (args ?? {}) as ListPeersArgs; + const clients = mesh_slug + ? [findClient(mesh_slug)].filter(Boolean) + : allClients(); + if (clients.length === 0) + return text( + mesh_slug + ? `list_peers: no joined mesh "${mesh_slug}"` + : "list_peers: no joined meshes", + true, + ); + const lines = clients.map( + (c) => + `- ${c!.meshSlug} (${c!.status}, mesh ${c!.meshId.slice(0, 8)}…)`, + ); + return text( + `Connected meshes:\n${lines.join("\n")}\n\n(list_peers WS protocol lands in Step 16; only mesh status is shown for now.)`, + ); + } + + case "check_messages": { + const drained: string[] = []; + for (const c of allClients()) { + const msgs = c.drainPushBuffer(); + for (const m of msgs) drained.push(formatPush(m, c.meshSlug)); + } + if (drained.length === 0) return text("No new messages."); + return text( + `${drained.length} new message(s):\n\n${drained.join("\n\n---\n\n")}`, + ); + } + + case "set_summary": { + const { summary } = (args ?? {}) as SetSummaryArgs; + if (!summary) return text("set_summary: `summary` required", true); + return text( + `set_summary: summary recorded locally ("${summary}"). (Broker WS protocol for summaries lands in Step 16.)`, + ); + } + + case "set_status": { + const { status } = (args ?? {}) as SetStatusArgs; + if (!status) return text("set_status: `status` required", true); + const s = status as PeerStatus; + for (const c of allClients()) await c.setStatus(s); + return text(`Status set to ${s} across ${allClients().length} mesh(es).`); + } + default: - throw new Error(`Unknown tool: ${name}`); + return text(`Unknown tool: ${name}`, true); } }); + // Start broker clients for every joined mesh BEFORE MCP connects. + await startClients(config); + const transport = new StdioServerTransport(); await server.connect(transport); + + const shutdown = (): void => { + stopAll(); + process.exit(0); + }; + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); } diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index b333896..61fe763 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -1,40 +1,335 @@ /** - * WS client to the broker (STUB). + * BrokerClient — WebSocket client connecting a CLI session to a claudemesh + * broker. Handles: + * - hello handshake + ack + * - send / ack / push message flow + * - auto-reconnect with exponential backoff (1s, 2s, 4s, ..., max 30s) + * - in-memory outbound queue while reconnecting + * - push buffer so the MCP check_messages tool can drain inbound history * - * Final implementation in Step 15b — connects to broker, sends hello - * (with signed nonce), pumps messages to/from the MCP server, handles - * reconnect. For now just a placeholder type surface so the MCP - * server can depend on it. + * Encryption is deferred to Step 18 (libsodium). Until then, ciphertext + * is plaintext UTF-8, nonce is a random 24-byte base64 string (for + * future-compat layout only). */ +import WebSocket from "ws"; +import { randomBytes } from "node:crypto"; import type { JoinedMesh } from "../state/config"; -export interface BrokerConnection { +export type Priority = "now" | "next" | "low"; +export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; + +export interface InboundPush { + messageId: string; meshId: string; - isConnected(): boolean; - sendMessage(args: { - targetSpec: string; - priority: "now" | "next" | "low"; - nonce: string; - ciphertext: string; - }): Promise<{ ok: boolean; messageId?: string; error?: string }>; - close(): void; + senderPubkey: string; + priority: Priority; + nonce: string; + ciphertext: string; + createdAt: string; + receivedAt: string; } -/** - * Stub broker connection. Returns "not implemented" errors on every - * call. Real implementation in 15b will connect to env.CLAUDEMESH_BROKER_URL. - */ -export function connectBroker(_mesh: JoinedMesh): BrokerConnection { - return { - meshId: _mesh.meshId, - isConnected: () => false, - sendMessage: async () => ({ - ok: false, - error: "broker client not implemented (Step 15b)", - }), - close: () => { - /* noop */ - }, - }; +type PushHandler = (msg: InboundPush) => void; + +interface PendingSend { + id: string; + targetSpec: string; + priority: Priority; + nonce: string; + ciphertext: string; + resolve: (v: { ok: boolean; messageId?: string; error?: string }) => void; +} + +const MAX_QUEUED = 100; +const HELLO_ACK_TIMEOUT_MS = 5_000; +const BACKOFF_CAPS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; + +export class BrokerClient { + private ws: WebSocket | null = null; + private _status: ConnStatus = "closed"; + private pendingSends = new Map(); + private outbound: Array<() => void> = []; // closures that send once ws is open + private pushHandlers = new Set(); + private pushBuffer: InboundPush[] = []; + private closed = false; + private reconnectAttempt = 0; + private helloTimer: NodeJS.Timeout | null = null; + private reconnectTimer: NodeJS.Timeout | null = null; + + constructor( + private mesh: JoinedMesh, + private opts: { + onStatusChange?: (status: ConnStatus) => void; + debug?: boolean; + } = {}, + ) {} + + get status(): ConnStatus { + return this._status; + } + get meshId(): string { + return this.mesh.meshId; + } + get meshSlug(): string { + return this.mesh.slug; + } + get pushHistory(): readonly InboundPush[] { + return this.pushBuffer; + } + + /** Open WS, send hello, resolve when hello_ack received. */ + async connect(): Promise { + if (this.closed) throw new Error("client is closed"); + this.setStatus("connecting"); + const ws = new WebSocket(this.mesh.brokerUrl); + this.ws = ws; + + return new Promise((resolve, reject) => { + const onOpen = (): void => { + this.debug("ws open → sending hello"); + ws.send( + JSON.stringify({ + type: "hello", + meshId: this.mesh.meshId, + memberId: this.mesh.memberId, + pubkey: this.mesh.pubkey, + sessionId: `${process.pid}-${Date.now()}`, + pid: process.pid, + cwd: process.cwd(), + signature: "stub", // libsodium sign_detached lands in Step 18 + nonce: randomNonce(), + }), + ); + // Arm the hello_ack timeout. + this.helloTimer = setTimeout(() => { + this.debug("hello_ack timeout"); + ws.close(); + reject(new Error("hello_ack timeout")); + }, HELLO_ACK_TIMEOUT_MS); + }; + + const onMessage = (raw: WebSocket.RawData): void => { + let msg: Record; + try { + msg = JSON.parse(raw.toString()); + } catch { + return; + } + if (msg.type === "hello_ack") { + if (this.helloTimer) clearTimeout(this.helloTimer); + this.helloTimer = null; + this.setStatus("open"); + this.reconnectAttempt = 0; + this.flushOutbound(); + resolve(); + return; + } + this.handleServerMessage(msg); + }; + + const onClose = (): void => { + if (this.helloTimer) clearTimeout(this.helloTimer); + this.helloTimer = null; + this.ws = null; + if (this._status !== "open" && this._status !== "reconnecting") { + reject(new Error("ws closed before hello_ack")); + } + if (!this.closed) this.scheduleReconnect(); + else this.setStatus("closed"); + }; + + const onError = (err: Error): void => { + this.debug(`ws error: ${err.message}`); + }; + + ws.on("open", onOpen); + ws.on("message", onMessage); + ws.on("close", onClose); + ws.on("error", onError); + }); + } + + /** Fire-and-wait send: resolves when broker acks. */ + async send( + targetSpec: string, + message: string, + priority: Priority = "next", + ): Promise<{ ok: boolean; messageId?: string; error?: string }> { + const id = randomId(); + const nonce = randomNonce(); + const ciphertext = Buffer.from(message, "utf-8").toString("base64"); + + return new Promise((resolve) => { + if (this.pendingSends.size >= MAX_QUEUED) { + resolve({ ok: false, error: "outbound queue full" }); + return; + } + this.pendingSends.set(id, { + id, + targetSpec, + priority, + nonce, + ciphertext, + resolve, + }); + const dispatch = (): void => { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send( + JSON.stringify({ + type: "send", + id, + targetSpec, + priority, + nonce, + ciphertext, + }), + ); + }; + if (this._status === "open") dispatch(); + else { + // Queue the dispatch closure; flushed on (re)connect. + if (this.outbound.length >= MAX_QUEUED) { + this.pendingSends.delete(id); + resolve({ ok: false, error: "outbound queue full" }); + return; + } + this.outbound.push(dispatch); + } + // Ack timeout: 10s to hear back. + setTimeout(() => { + if (this.pendingSends.has(id)) { + this.pendingSends.delete(id); + resolve({ ok: false, error: "ack timeout" }); + } + }, 10_000); + }); + } + + /** Subscribe to inbound pushes. Returns an unsubscribe function. */ + onPush(handler: PushHandler): () => void { + this.pushHandlers.add(handler); + return () => this.pushHandlers.delete(handler); + } + + /** Drain the buffered push history (used by check_messages tool). */ + drainPushBuffer(): InboundPush[] { + const drained = this.pushBuffer.slice(); + this.pushBuffer.length = 0; + return drained; + } + + /** Send a manual status override. Fire-and-forget (no ack). */ + async setStatus(status: "idle" | "working" | "dnd"): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "set_status", status })); + } + + close(): void { + this.closed = true; + if (this.helloTimer) clearTimeout(this.helloTimer); + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); + if (this.ws) { + try { + this.ws.close(); + } catch { + /* ignore */ + } + } + this.setStatus("closed"); + } + + // --- Internals --- + + private handleServerMessage(msg: Record): void { + if (msg.type === "ack") { + const pending = this.pendingSends.get(String(msg.id ?? "")); + if (pending) { + pending.resolve({ + ok: true, + messageId: String(msg.messageId ?? ""), + }); + this.pendingSends.delete(pending.id); + } + return; + } + if (msg.type === "push") { + const push: InboundPush = { + messageId: String(msg.messageId ?? ""), + meshId: String(msg.meshId ?? ""), + senderPubkey: String(msg.senderPubkey ?? ""), + priority: (msg.priority as Priority) ?? "next", + nonce: String(msg.nonce ?? ""), + ciphertext: String(msg.ciphertext ?? ""), + createdAt: String(msg.createdAt ?? ""), + receivedAt: new Date().toISOString(), + }; + this.pushBuffer.push(push); + // Cap buffer at 500 entries to avoid unbounded growth. + if (this.pushBuffer.length > 500) this.pushBuffer.shift(); + for (const h of this.pushHandlers) { + try { + h(push); + } catch { + /* handler errors are not the transport's problem */ + } + } + return; + } + if (msg.type === "error") { + this.debug(`broker error: ${msg.code} ${msg.message}`); + const id = msg.id ? String(msg.id) : null; + if (id) { + const pending = this.pendingSends.get(id); + if (pending) { + pending.resolve({ + ok: false, + error: `${msg.code}: ${msg.message}`, + }); + this.pendingSends.delete(id); + } + } + } + } + + private flushOutbound(): void { + const queued = this.outbound.slice(); + this.outbound.length = 0; + for (const send of queued) send(); + } + + private scheduleReconnect(): void { + this.setStatus("reconnecting"); + const delay = + BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!; + this.reconnectAttempt += 1; + this.debug( + `reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`, + ); + this.reconnectTimer = setTimeout(() => { + if (this.closed) return; + this.connect().catch((e) => { + this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`); + }); + }, delay); + } + + private setStatus(s: ConnStatus): void { + if (this._status === s) return; + this._status = s; + this.opts.onStatusChange?.(s); + } + + private debug(msg: string): void { + if (this.opts.debug) console.error(`[broker-client] ${msg}`); + } +} + +function randomId(): string { + return randomBytes(8).toString("hex"); +} + +function randomNonce(): string { + // 24-byte nonce layout (compatible with libsodium crypto_secretbox later) + return randomBytes(24).toString("base64"); } diff --git a/apps/cli/src/ws/manager.ts b/apps/cli/src/ws/manager.ts new file mode 100644 index 0000000..bc689a1 --- /dev/null +++ b/apps/cli/src/ws/manager.ts @@ -0,0 +1,55 @@ +/** + * Process-wide registry of BrokerClient connections, keyed by meshId. + * + * The MCP server lazily starts a client per joined mesh on startup, + * keeps them alive for the life of the process, and uses them to + * service MCP tool calls. + */ + +import { BrokerClient } from "./client"; +import type { Config, JoinedMesh } from "../state/config"; +import { env } from "../env"; + +const clients = new Map(); + +/** Ensure a BrokerClient exists + is connecting/open for this mesh. */ +export async function ensureClient(mesh: JoinedMesh): Promise { + const existing = clients.get(mesh.meshId); + if (existing) return existing; + const client = new BrokerClient(mesh, { debug: env.CLAUDEMESH_DEBUG }); + clients.set(mesh.meshId, client); + try { + await client.connect(); + } catch { + // Connect failed → client is in "reconnecting" state, leave it + // wired so tool calls can surface the status. + } + return client; +} + +/** Start clients for every joined mesh. Called once on MCP server start. */ +export async function startClients(config: Config): Promise { + await Promise.allSettled(config.meshes.map(ensureClient)); +} + +/** Look up a client by mesh slug (human-friendly) or meshId. */ +export function findClient(needle: string): BrokerClient | null { + // Try meshId first, then slug. + const byId = clients.get(needle); + if (byId) return byId; + for (const c of clients.values()) { + if (c.meshSlug === needle) return c; + } + return null; +} + +/** All clients across all meshes. */ +export function allClients(): BrokerClient[] { + return [...clients.values()]; +} + +/** Close every client (shutdown hook). */ +export function stopAll(): void { + for (const c of clients.values()) c.close(); + clients.clear(); +}