feat(cli): websocket client + MCP tool integration

broker-client: full WS client with hello handshake + ack, auto-reconnect
with exponential backoff (1s → 30s capped), in-memory outbound queue
(max 100) during reconnect, 500-entry push buffer for check_messages.

MCP tool integration:
- send_message: "slug:target" prefix or single-mesh fast path
- check_messages: drains push buffers across all clients
- set_status: fans manual override across all connected meshes
- set_summary: stubbed (broker protocol extension needed)
- list_peers: stubbed — lists connected mesh slugs + statuses

manager module holds Map<meshId, BrokerClient>, starts on MCP server
boot for every joined mesh in ~/.claudemesh/config.json.

new CLI command: seed-test-mesh injects a mesh row for dev testing.

also fixes a broker-side hello race: handleHello sent hello_ack before
the caller closure assigned presenceId, so clients sending right after
the ack hit the no_hello check. Fix: return presenceId, caller sets
closure var, THEN sends hello_ack. Queue drain is fire-and-forget now.

round-trip verified: two clients, A→B, push received with correct
senderPubkey + ciphertext. 44/44 broker tests still pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-04 22:30:11 +01:00
parent 8931296e82
commit 20d968f989
8 changed files with 709 additions and 79 deletions

View File

@@ -1,40 +1,335 @@
/**
* WS client to the broker (STUB).
* BrokerClient — WebSocket client connecting a CLI session to a claudemesh
* broker. Handles:
* - hello handshake + ack
* - send / ack / push message flow
* - auto-reconnect with exponential backoff (1s, 2s, 4s, ..., max 30s)
* - in-memory outbound queue while reconnecting
* - push buffer so the MCP check_messages tool can drain inbound history
*
* Final implementation in Step 15b — connects to broker, sends hello
* (with signed nonce), pumps messages to/from the MCP server, handles
* reconnect. For now just a placeholder type surface so the MCP
* server can depend on it.
* Encryption is deferred to Step 18 (libsodium). Until then, ciphertext
* is plaintext UTF-8, nonce is a random 24-byte base64 string (for
* future-compat layout only).
*/
import WebSocket from "ws";
import { randomBytes } from "node:crypto";
import type { JoinedMesh } from "../state/config";
export interface BrokerConnection {
export type Priority = "now" | "next" | "low";
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting";
export interface InboundPush {
messageId: string;
meshId: string;
isConnected(): boolean;
sendMessage(args: {
targetSpec: string;
priority: "now" | "next" | "low";
nonce: string;
ciphertext: string;
}): Promise<{ ok: boolean; messageId?: string; error?: string }>;
close(): void;
senderPubkey: string;
priority: Priority;
nonce: string;
ciphertext: string;
createdAt: string;
receivedAt: string;
}
/**
* Stub broker connection. Returns "not implemented" errors on every
* call. Real implementation in 15b will connect to env.CLAUDEMESH_BROKER_URL.
*/
export function connectBroker(_mesh: JoinedMesh): BrokerConnection {
return {
meshId: _mesh.meshId,
isConnected: () => false,
sendMessage: async () => ({
ok: false,
error: "broker client not implemented (Step 15b)",
}),
close: () => {
/* noop */
},
};
type PushHandler = (msg: InboundPush) => void;
interface PendingSend {
id: string;
targetSpec: string;
priority: Priority;
nonce: string;
ciphertext: string;
resolve: (v: { ok: boolean; messageId?: string; error?: string }) => void;
}
const MAX_QUEUED = 100;
const HELLO_ACK_TIMEOUT_MS = 5_000;
const BACKOFF_CAPS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000];
export class BrokerClient {
private ws: WebSocket | null = null;
private _status: ConnStatus = "closed";
private pendingSends = new Map<string, PendingSend>();
private outbound: Array<() => void> = []; // closures that send once ws is open
private pushHandlers = new Set<PushHandler>();
private pushBuffer: InboundPush[] = [];
private closed = false;
private reconnectAttempt = 0;
private helloTimer: NodeJS.Timeout | null = null;
private reconnectTimer: NodeJS.Timeout | null = null;
constructor(
private mesh: JoinedMesh,
private opts: {
onStatusChange?: (status: ConnStatus) => void;
debug?: boolean;
} = {},
) {}
get status(): ConnStatus {
return this._status;
}
get meshId(): string {
return this.mesh.meshId;
}
get meshSlug(): string {
return this.mesh.slug;
}
get pushHistory(): readonly InboundPush[] {
return this.pushBuffer;
}
/** Open WS, send hello, resolve when hello_ack received. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client is closed");
this.setStatus("connecting");
const ws = new WebSocket(this.mesh.brokerUrl);
this.ws = ws;
return new Promise<void>((resolve, reject) => {
const onOpen = (): void => {
this.debug("ws open → sending hello");
ws.send(
JSON.stringify({
type: "hello",
meshId: this.mesh.meshId,
memberId: this.mesh.memberId,
pubkey: this.mesh.pubkey,
sessionId: `${process.pid}-${Date.now()}`,
pid: process.pid,
cwd: process.cwd(),
signature: "stub", // libsodium sign_detached lands in Step 18
nonce: randomNonce(),
}),
);
// Arm the hello_ack timeout.
this.helloTimer = setTimeout(() => {
this.debug("hello_ack timeout");
ws.close();
reject(new Error("hello_ack timeout"));
}, HELLO_ACK_TIMEOUT_MS);
};
const onMessage = (raw: WebSocket.RawData): void => {
let msg: Record<string, unknown>;
try {
msg = JSON.parse(raw.toString());
} catch {
return;
}
if (msg.type === "hello_ack") {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.setStatus("open");
this.reconnectAttempt = 0;
this.flushOutbound();
resolve();
return;
}
this.handleServerMessage(msg);
};
const onClose = (): void => {
if (this.helloTimer) clearTimeout(this.helloTimer);
this.helloTimer = null;
this.ws = null;
if (this._status !== "open" && this._status !== "reconnecting") {
reject(new Error("ws closed before hello_ack"));
}
if (!this.closed) this.scheduleReconnect();
else this.setStatus("closed");
};
const onError = (err: Error): void => {
this.debug(`ws error: ${err.message}`);
};
ws.on("open", onOpen);
ws.on("message", onMessage);
ws.on("close", onClose);
ws.on("error", onError);
});
}
/** Fire-and-wait send: resolves when broker acks. */
async send(
targetSpec: string,
message: string,
priority: Priority = "next",
): Promise<{ ok: boolean; messageId?: string; error?: string }> {
const id = randomId();
const nonce = randomNonce();
const ciphertext = Buffer.from(message, "utf-8").toString("base64");
return new Promise((resolve) => {
if (this.pendingSends.size >= MAX_QUEUED) {
resolve({ ok: false, error: "outbound queue full" });
return;
}
this.pendingSends.set(id, {
id,
targetSpec,
priority,
nonce,
ciphertext,
resolve,
});
const dispatch = (): void => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(
JSON.stringify({
type: "send",
id,
targetSpec,
priority,
nonce,
ciphertext,
}),
);
};
if (this._status === "open") dispatch();
else {
// Queue the dispatch closure; flushed on (re)connect.
if (this.outbound.length >= MAX_QUEUED) {
this.pendingSends.delete(id);
resolve({ ok: false, error: "outbound queue full" });
return;
}
this.outbound.push(dispatch);
}
// Ack timeout: 10s to hear back.
setTimeout(() => {
if (this.pendingSends.has(id)) {
this.pendingSends.delete(id);
resolve({ ok: false, error: "ack timeout" });
}
}, 10_000);
});
}
/** Subscribe to inbound pushes. Returns an unsubscribe function. */
onPush(handler: PushHandler): () => void {
this.pushHandlers.add(handler);
return () => this.pushHandlers.delete(handler);
}
/** Drain the buffered push history (used by check_messages tool). */
drainPushBuffer(): InboundPush[] {
const drained = this.pushBuffer.slice();
this.pushBuffer.length = 0;
return drained;
}
/** Send a manual status override. Fire-and-forget (no ack). */
async setStatus(status: "idle" | "working" | "dnd"): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "set_status", status }));
}
close(): void {
this.closed = true;
if (this.helloTimer) clearTimeout(this.helloTimer);
if (this.reconnectTimer) clearTimeout(this.reconnectTimer);
if (this.ws) {
try {
this.ws.close();
} catch {
/* ignore */
}
}
this.setStatus("closed");
}
// --- Internals ---
private handleServerMessage(msg: Record<string, unknown>): void {
if (msg.type === "ack") {
const pending = this.pendingSends.get(String(msg.id ?? ""));
if (pending) {
pending.resolve({
ok: true,
messageId: String(msg.messageId ?? ""),
});
this.pendingSends.delete(pending.id);
}
return;
}
if (msg.type === "push") {
const push: InboundPush = {
messageId: String(msg.messageId ?? ""),
meshId: String(msg.meshId ?? ""),
senderPubkey: String(msg.senderPubkey ?? ""),
priority: (msg.priority as Priority) ?? "next",
nonce: String(msg.nonce ?? ""),
ciphertext: String(msg.ciphertext ?? ""),
createdAt: String(msg.createdAt ?? ""),
receivedAt: new Date().toISOString(),
};
this.pushBuffer.push(push);
// Cap buffer at 500 entries to avoid unbounded growth.
if (this.pushBuffer.length > 500) this.pushBuffer.shift();
for (const h of this.pushHandlers) {
try {
h(push);
} catch {
/* handler errors are not the transport's problem */
}
}
return;
}
if (msg.type === "error") {
this.debug(`broker error: ${msg.code} ${msg.message}`);
const id = msg.id ? String(msg.id) : null;
if (id) {
const pending = this.pendingSends.get(id);
if (pending) {
pending.resolve({
ok: false,
error: `${msg.code}: ${msg.message}`,
});
this.pendingSends.delete(id);
}
}
}
}
private flushOutbound(): void {
const queued = this.outbound.slice();
this.outbound.length = 0;
for (const send of queued) send();
}
private scheduleReconnect(): void {
this.setStatus("reconnecting");
const delay =
BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!;
this.reconnectAttempt += 1;
this.debug(
`reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`,
);
this.reconnectTimer = setTimeout(() => {
if (this.closed) return;
this.connect().catch((e) => {
this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`);
});
}, delay);
}
private setStatus(s: ConnStatus): void {
if (this._status === s) return;
this._status = s;
this.opts.onStatusChange?.(s);
}
private debug(msg: string): void {
if (this.opts.debug) console.error(`[broker-client] ${msg}`);
}
}
function randomId(): string {
return randomBytes(8).toString("hex");
}
function randomNonce(): string {
// 24-byte nonce layout (compatible with libsodium crypto_secretbox later)
return randomBytes(24).toString("base64");
}