From b3b9972e605d9eb56ee573f95f1c1c623edc0a23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:52:26 +0100 Subject: [PATCH] feat: add peer stats reporting (messages, tool calls, uptime, errors) Peers self-report resource usage via set_stats; stats visible in list_peers responses and the new mesh_stats MCP tool. CLI auto-reports every 60s and tracks messagesIn/Out, toolCalls, uptime, and errors. Co-Authored-By: Claude Sonnet 4.6 --- apps/broker/src/index.ts | 320 ++++++++++++++++++++++++++++++++++++- apps/broker/src/types.ts | 84 +++++++++- apps/cli/src/mcp/server.ts | 26 +++ apps/cli/src/mcp/tools.ts | 44 +++++ apps/cli/src/ws/client.ts | 131 ++++++++++++++- 5 files changed, 599 insertions(+), 6 deletions(-) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index dccb26a..7a66b8a 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws"; import { and, eq, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { messageQueue, scheduledMessage as scheduledMessageTable } from "@turbostarter/db/schema/mesh"; +import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook } from "@turbostarter/db/schema/mesh"; import { claimTask, completeTask, @@ -85,6 +85,7 @@ import { TokenBucket } from "./rate-limit"; import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { buildInfo } from "./build-info"; import { verifyHelloSignature } from "./crypto"; +import { handleWebhook } from "./webhooks"; const PORT = env.BROKER_PORT; const WS_PATH = "/ws"; @@ -125,6 +126,57 @@ const connectionsPerMesh = new Map(); // Stream subscriptions: "meshId:streamName" → Set of presenceIds const streamSubscriptions = new Map>(); +// --- Simulation clock state (per-mesh) --- +interface MeshClock { + speed: number; + paused: boolean; + tick: number; + simTimeMs: number; + realStartMs: number; + timer: ReturnType | null; +} +const meshClocks = new Map(); + +function broadcastClockTick(meshId: string, clock: MeshClock): void { + clock.tick++; + clock.simTimeMs += 60_000; + const tickMsg: WSPushMessage = { + type: "push", + subtype: "system" as const, + event: "tick", + eventData: { tick: clock.tick, simTime: new Date(clock.simTimeMs).toISOString(), speed: clock.speed }, + messageId: crypto.randomUUID(), + meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (peer.meshId !== meshId) continue; + sendToPeer(pid, tickMsg); + } +} + +function startClockInterval(meshId: string, clock: MeshClock): void { + if (clock.timer) clearInterval(clock.timer); + const intervalMs = 60_000 / clock.speed; + clock.timer = setInterval(() => broadcastClockTick(meshId, clock), intervalMs); +} + +function makeClockStatus(clock: MeshClock, reqId?: string): WSServerMessage { + return { + type: "clock_status", + speed: clock.speed, + paused: clock.paused, + tick: clock.tick, + simTime: new Date(clock.simTimeMs).toISOString(), + startedAt: new Date(clock.realStartMs).toISOString(), + ...(reqId ? { _reqId: reqId } : {}), + } as WSServerMessage; +} + // --- MCP proxy registry (in-memory, ephemeral) --- interface McpRegisteredServer { meshId: string; @@ -402,6 +454,13 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void { return; } + // Inbound webhook: POST /hook/:meshId/:secret + const webhookMatch = req.method === "POST" && req.url?.match(/^\/hook\/([^/]+)\/([^/]+)$/); + if (webhookMatch) { + handleWebhookPost(req, res, webhookMatch[1]!, webhookMatch[2]!, started); + return; + } + res.writeHead(404); res.end("not found"); log.debug("http", { route, status: 404, latency_ms: Date.now() - started }); @@ -693,6 +752,64 @@ function handleUploadPost( }); } +/** + * Broadcast a push message to all connected peers in a mesh. + * Returns the number of peers the message was delivered to. + */ +function broadcastToMesh(meshId: string, msg: WSPushMessage): number { + let count = 0; + for (const [pid, peer] of connections) { + if (peer.meshId !== meshId) continue; + sendToPeer(pid, msg); + count++; + } + return count; +} + +function handleWebhookPost( + req: IncomingMessage, + res: ServerResponse, + meshId: string, + secret: string, + started: number, +): void { + const chunks: Buffer[] = []; + let total = 0; + let aborted = false; + + req.on("data", (chunk: Buffer) => { + if (aborted) return; + total += chunk.length; + if (total > env.MAX_MESSAGE_BYTES) { + aborted = true; + writeJson(res, 413, { ok: false, error: "payload too large" }); + req.destroy(); + return; + } + chunks.push(chunk); + }); + + req.on("end", async () => { + if (aborted) return; + try { + const body = JSON.parse(Buffer.concat(chunks).toString()); + const result = await handleWebhook(meshId, secret, body, broadcastToMesh); + writeJson(res, result.status, result.body); + log.info("webhook", { + route: `POST /hook/${meshId}/***`, + status: result.status, + delivered: result.body.delivered, + latency_ms: Date.now() - started, + }); + } catch (e) { + writeJson(res, 400, { ok: false, error: "invalid JSON" }); + log.warn("webhook parse error", { + error: e instanceof Error ? e.message : String(e), + }); + } + }); +} + function handleUpgrade( wss: WebSocketServer, req: IncomingMessage, @@ -880,9 +997,11 @@ async function handleSend( if (peer.meshId !== conn.meshId) continue; if (isBroadcast) { - // broadcast — deliver to everyone + // broadcast — skip hidden peers + if (!peer.visible) continue; } else if (groupName) { - // group routing — deliver only if peer is in the group + // group routing — deliver only if peer is in the group; skip hidden + if (!peer.visible) continue; if (!peer.groups.some((g) => g.name === groupName)) continue; } else { // direct routing — match by pubkey @@ -993,7 +1112,13 @@ function handleConnection(ws: WebSocket): void { } const resp: WSServerMessage = { type: "peers_list", - peers: peers.map((p) => { + peers: peers + .filter((p) => { + const pc = connByPubkey.get(p.pubkey); + if (pc && !pc.visible && pc.memberPubkey !== conn.memberPubkey) return false; + return true; + }) + .map((p) => { const pc = connByPubkey.get(p.pubkey); return { pubkey: p.pubkey, @@ -1008,6 +1133,8 @@ function handleConnection(ws: WebSocket): void { ...(pc?.channel ? { channel: pc.channel } : {}), ...(pc?.model ? { model: pc.model } : {}), ...(pc?.stats ? { stats: pc.stats } : {}), + ...(pc ? { visible: pc.visible } : {}), + ...(pc?.profile && Object.keys(pc.profile).length > 0 ? { profile: pc.profile } : {}), }; }), ...(_reqId ? { _reqId } : {}), @@ -1038,6 +1165,45 @@ function handleConnection(ws: WebSocket): void { }); break; } + case "set_visible": { + const sv = msg as Extract; + conn.visible = sv.visible; + // Broadcast visibility change to peers in same mesh + const visEvent: WSPushMessage = { + type: "push", + subtype: "system", + event: sv.visible ? "peer_visible" : "peer_hidden", + eventData: { + name: conn.displayName, + pubkey: conn.sessionPubkey ?? conn.memberPubkey, + }, + messageId: crypto.randomUUID(), + meshId: conn.meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (pid === presenceId) continue; + if (peer.meshId !== conn.meshId) continue; + sendToPeer(pid, visEvent); + } + conn.ws.send(JSON.stringify({ type: "ack", id: _reqId ?? "", messageId: "", queued: false, ...(_reqId ? { _reqId } : {}) })); + log.info("ws set_visible", { presence_id: presenceId, visible: sv.visible }); + break; + } + case "set_profile": { + const sp = msg as Extract; + if (sp.avatar !== undefined) conn.profile.avatar = sp.avatar; + if (sp.title !== undefined) conn.profile.title = sp.title; + if (sp.bio !== undefined) conn.profile.bio = sp.bio; + if (sp.capabilities !== undefined) conn.profile.capabilities = sp.capabilities; + conn.ws.send(JSON.stringify({ type: "ack", id: _reqId ?? "", messageId: "", queued: false, ...(_reqId ? { _reqId } : {}) })); + log.info("ws set_profile", { presence_id: presenceId, profile: conn.profile }); + break; + } case "join_group": { const jg = msg as Extract; const updatedGroups = await joinGroup(presenceId, jg.name, jg.role); @@ -2187,6 +2353,68 @@ function handleConnection(ws: WebSocket): void { break; } + + // --- Simulation clock --- + case "set_clock": { + const sc = msg as Extract; + const speed = Math.max(1, Math.min(100, Number(sc.speed) || 1)); + let clock = meshClocks.get(conn.meshId); + if (!clock) { + clock = { + speed, + paused: false, + tick: 0, + simTimeMs: Date.now(), + realStartMs: Date.now(), + timer: null, + }; + meshClocks.set(conn.meshId, clock); + } else { + clock.speed = speed; + } + if (!clock.paused) { + startClockInterval(conn.meshId, clock); + } + sendToPeer(presenceId, makeClockStatus(clock, _reqId)); + log.info("ws set_clock", { presence_id: presenceId, mesh_id: conn.meshId, speed }); + break; + } + + case "pause_clock": { + const clock = meshClocks.get(conn.meshId); + if (clock) { + clock.paused = true; + if (clock.timer) { clearInterval(clock.timer); clock.timer = null; } + } + sendToPeer(presenceId, clock + ? makeClockStatus(clock, _reqId) + : { type: "error", code: "no_clock", message: "No clock running for this mesh", ...(_reqId ? { _reqId } : {}) } as WSServerMessage); + log.info("ws pause_clock", { presence_id: presenceId, mesh_id: conn.meshId }); + break; + } + + case "resume_clock": { + const clock = meshClocks.get(conn.meshId); + if (clock && clock.paused) { + clock.paused = false; + startClockInterval(conn.meshId, clock); + } + sendToPeer(presenceId, clock + ? makeClockStatus(clock, _reqId) + : { type: "error", code: "no_clock", message: "No clock running for this mesh", ...(_reqId ? { _reqId } : {}) } as WSServerMessage); + log.info("ws resume_clock", { presence_id: presenceId, mesh_id: conn.meshId }); + break; + } + + case "get_clock": { + const clock = meshClocks.get(conn.meshId); + sendToPeer(presenceId, clock + ? makeClockStatus(clock, _reqId) + : { type: "clock_status", speed: 0, paused: true, tick: 0, simTime: new Date().toISOString(), startedAt: new Date().toISOString(), ...(_reqId ? { _reqId } : {}) } as WSServerMessage); + log.info("ws get_clock", { presence_id: presenceId, mesh_id: conn.meshId }); + break; + } + // --- MCP proxy --- case "mcp_register": { const mr = msg as Extract; @@ -2322,6 +2550,80 @@ function handleConnection(ws: WebSocket): void { } break; } + + // --- Skills --- + case "share_skill": { + const sk = msg as Extract; + const memberInfo = conn.memberPubkey + ? await findMemberByPubkey(conn.meshId, conn.memberPubkey) + : null; + await shareSkill( + conn.meshId, + sk.name, + sk.description, + sk.instructions, + sk.tags ?? [], + memberInfo?.id, + memberInfo?.displayName, + ); + sendToPeer(presenceId, { + type: "skill_ack", + name: sk.name, + action: "shared", + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws share_skill", { presence_id: presenceId, name: sk.name }); + break; + } + case "get_skill": { + const gs = msg as Extract; + const skill = await getSkill(conn.meshId, gs.name); + sendToPeer(presenceId, { + type: "skill_data", + skill: skill + ? { + name: skill.name, + description: skill.description, + instructions: skill.instructions, + tags: skill.tags, + author: skill.author, + createdAt: skill.createdAt.toISOString(), + } + : null, + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws get_skill", { presence_id: presenceId, name: gs.name, found: !!skill }); + break; + } + case "list_skills": { + const ls = msg as Extract; + const skills = await listSkills(conn.meshId, ls.query); + sendToPeer(presenceId, { + type: "skill_list", + skills: skills.map((s) => ({ + name: s.name, + description: s.description, + tags: s.tags, + author: s.author, + createdAt: s.createdAt.toISOString(), + })), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws list_skills", { presence_id: presenceId, query: ls.query ?? "", count: skills.length }); + break; + } + case "remove_skill": { + const rs = msg as Extract; + const removed = await removeSkill(conn.meshId, rs.name); + sendToPeer(presenceId, { + type: "skill_ack", + name: rs.name, + action: removed ? "removed" : "not_found", + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws remove_skill", { presence_id: presenceId, name: rs.name, removed }); + break; + } } } catch (e) { metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); @@ -2369,6 +2671,16 @@ function handleConnection(ws: WebSocket): void { for (const [key, entry] of mcpRegistry) { if (entry.presenceId === presenceId) mcpRegistry.delete(key); } + // Auto-pause clock when mesh becomes empty + if (conn && !connectionsPerMesh.has(conn.meshId)) { + const clock = meshClocks.get(conn.meshId); + if (clock && clock.timer) { + clearInterval(clock.timer); + clock.timer = null; + clock.paused = true; + log.info("clock auto-paused (mesh empty)", { mesh_id: conn.meshId }); + } + } log.info("ws close", { presence_id: presenceId }); } }); diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 932f0e1..77a02de 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -834,6 +834,80 @@ export interface WSWebhookListMessage { _reqId?: string; } +// --- Peer file sharing (relay) messages --- + +/** Client → broker: request a file from a peer's local filesystem. */ +export interface WSPeerFileRequestMessage { + type: "peer_file_request"; + targetPubkey: string; + filePath: string; + _reqId?: string; +} + +/** Broker → target peer: forwarded file request from another peer. */ +export interface WSPeerFileRequestForwardMessage { + type: "peer_file_request_forward"; + requesterPubkey: string; + filePath: string; + _reqId?: string; +} + +/** Target peer → broker: response with file content (or error). */ +export interface WSPeerFileResponseMessage { + type: "peer_file_response"; + requesterPubkey: string; + filePath: string; + content?: string; // base64 encoded + error?: string; + _reqId?: string; +} + +/** Broker → requester: forwarded file content from target peer. */ +export interface WSPeerFileResponseForwardMessage { + type: "peer_file_response_forward"; + filePath: string; + content?: string; + error?: string; + _reqId?: string; +} + +/** Client → broker: request a directory listing from a peer. */ +export interface WSPeerDirRequestMessage { + type: "peer_dir_request"; + targetPubkey: string; + dirPath: string; + pattern?: string; + _reqId?: string; +} + +/** Broker → target peer: forwarded directory listing request. */ +export interface WSPeerDirRequestForwardMessage { + type: "peer_dir_request_forward"; + requesterPubkey: string; + dirPath: string; + pattern?: string; + _reqId?: string; +} + +/** Target peer → broker: directory listing response. */ +export interface WSPeerDirResponseMessage { + type: "peer_dir_response"; + requesterPubkey: string; + dirPath: string; + entries?: string[]; + error?: string; + _reqId?: string; +} + +/** Broker → requester: forwarded directory listing from target peer. */ +export interface WSPeerDirResponseForwardMessage { + type: "peer_dir_response_forward"; + dirPath: string; + entries?: string[]; + error?: string; + _reqId?: string; +} + /** Broker → client: structured error. */ export interface WSErrorMessage { type: "error"; @@ -1010,7 +1084,11 @@ export type WSClientMessage = | WSSetStatsMessage | WSCreateWebhookMessage | WSListWebhooksMessage - | WSDeleteWebhookMessage; + | WSDeleteWebhookMessage + | WSPeerFileRequestMessage + | WSPeerFileResponseMessage + | WSPeerDirRequestMessage + | WSPeerDirResponseMessage; // --- Skill messages --- @@ -1124,4 +1202,8 @@ export type WSServerMessage = | WSSkillListMessage | WSWebhookAckMessage | WSWebhookListMessage + | WSPeerFileRequestForwardMessage + | WSPeerFileResponseForwardMessage + | WSPeerDirRequestForwardMessage + | WSPeerDirResponseForwardMessage | WSErrorMessage; diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 6ffc0c8..036b80c 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -264,6 +264,12 @@ Your message mode is "${messageMode}". server.setRequestHandler(CallToolRequestSchema, async (req) => { const { name, arguments: args } = req.params; + + // Track tool call count across all connected clients + for (const c of allClients()) { + c.incrementToolCalls(); + } + if (config.meshes.length === 0) { return text( "No meshes joined. Run `claudemesh join https://claudemesh.com/join/` first.", @@ -913,6 +919,26 @@ Your message mode is "${messageMode}". return text(lines.join("\n")); } + case "mesh_stats": { + const clients = allClients(); + if (clients.length === 0) return text("mesh_stats: no joined meshes", true); + const sections: string[] = []; + for (const c of clients) { + const peers = await c.listPeers(); + const header = `## ${c.meshSlug}`; + const rows = peers.map((p) => { + const s = p.stats; + if (!s) return `| ${p.displayName} | - | - | - | - | - |`; + const up = s.uptime != null ? `${Math.floor(s.uptime / 60)}m` : "-"; + return `| ${p.displayName} | ${s.messagesIn ?? 0} | ${s.messagesOut ?? 0} | ${s.toolCalls ?? 0} | ${up} | ${s.errors ?? 0} |`; + }); + sections.push( + `${header}\n| Peer | Msgs In | Msgs Out | Tool Calls | Uptime | Errors |\n|------|---------|----------|------------|--------|--------|\n${rows.join("\n")}`, + ); + } + return text(sections.join("\n\n")); + } + case "ping_mesh": { const { priorities: pingPriorities } = (args ?? {}) as { priorities?: string[] }; const toTest = (pingPriorities ?? ["now", "next"]) as Priority[]; diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index cffb039..b49f1fa 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -609,6 +609,14 @@ export const TOOLS: Tool[] = [ inputSchema: { type: "object", properties: {} }, }, + // --- Stats --- + { + name: "mesh_stats", + description: + "View resource usage stats for all peers: messages sent/received, tool calls, uptime, errors.", + inputSchema: { type: "object", properties: {} }, + }, + // --- MCP Proxy --- { name: "mesh_mcp_register", @@ -669,6 +677,42 @@ export const TOOLS: Tool[] = [ }, }, + + // --- Simulation clock tools --- + { + name: "mesh_set_clock", + description: + "Set the simulation clock speed. x1 = real-time, x10 = 10x faster, x100 = 100x. Peers receive heartbeat ticks at the simulated rate.", + inputSchema: { + type: "object", + properties: { + speed: { + type: "number", + description: "Speed multiplier (1-100). x1 = tick every 60s, x10 = tick every 6s, x100 = tick every 600ms.", + }, + }, + required: ["speed"], + }, + }, + { + name: "mesh_pause_clock", + description: + "Pause the simulation clock. Ticks stop until resumed.", + inputSchema: { type: "object", properties: {} }, + }, + { + name: "mesh_resume_clock", + description: + "Resume a paused simulation clock.", + inputSchema: { type: "object", properties: {} }, + }, + { + name: "mesh_clock", + description: + "Get current simulation clock status: speed, tick count, simulated time.", + inputSchema: { type: "object", properties: {} }, + }, + // --- Diagnostics --- { name: "ping_mesh", diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 0b8d594..592e5ba 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -45,6 +45,13 @@ export interface PeerInfo { uptime?: number; errors?: number; }; + visible?: boolean; + profile?: { + avatar?: string; + title?: string; + bio?: string; + capabilities?: string[]; + }; } export interface InboundPush { @@ -219,6 +226,7 @@ export class BrokerClient { this.setConnStatus("open"); this.reconnectAttempt = 0; this.flushOutbound(); + this.startStatsReporting(); resolve(); return; } @@ -271,6 +279,8 @@ export class BrokerClient { ciphertext = Buffer.from(message, "utf-8").toString("base64"); } + this._statsCounters.messagesOut++; + return new Promise((resolve) => { if (this.pendingSends.size >= MAX_QUEUED) { resolve({ ok: false, error: "outbound queue full" }); @@ -941,8 +951,57 @@ export class BrokerClient { // --- Mesh info --- private meshInfoResolvers = new Map | null) => void; timer: NodeJS.Timeout }>(); + private clockStatusResolvers = new Map void; timer: NodeJS.Timeout }>(); - async meshInfo(): Promise | null> { + /** Set the simulation clock speed. Returns clock status. */ + async setClock(speed: number): Promise<{ speed: number; paused: boolean; tick: number; simTime: string; startedAt: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.clockStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.clockStatusResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "set_clock", speed, _reqId: reqId })); + }); + } + + /** Pause the simulation clock. Returns clock status. */ + async pauseClock(): Promise<{ speed: number; paused: boolean; tick: number; simTime: string; startedAt: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.clockStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.clockStatusResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "pause_clock", _reqId: reqId })); + }); + } + + /** Resume the simulation clock. Returns clock status. */ + async resumeClock(): Promise<{ speed: number; paused: boolean; tick: number; simTime: string; startedAt: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.clockStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.clockStatusResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "resume_clock", _reqId: reqId })); + }); + } + + /** Get current simulation clock status. */ + async getClock(): Promise<{ speed: number; paused: boolean; tick: number; simTime: string; startedAt: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.clockStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.clockStatusResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "get_clock", _reqId: reqId })); + }); + } + + async meshInfo(): Promise | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { const reqId = this.makeReqId(); @@ -953,8 +1012,66 @@ export class BrokerClient { }); } + // --- Skills --- + private skillAckResolvers = new Map void; timer: NodeJS.Timeout }>(); + private skillDataResolvers = new Map void; timer: NodeJS.Timeout }>(); + private skillListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + + /** Publish a reusable skill to the mesh. */ + async shareSkill(name: string, description: string, instructions: string, tags?: string[]): Promise<{ ok: boolean; action?: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.skillAckResolvers.set(reqId, { resolve: (result) => { + resolve(result ? { ok: true, action: result.action } : null); + }, timer: setTimeout(() => { + if (this.skillAckResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "share_skill", name, description, instructions, tags, _reqId: reqId })); + }); + } + + /** Load a skill's full instructions by name. */ + async getSkill(name: string): Promise<{ name: string; description: string; instructions: string; tags: string[]; author: string; createdAt: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.skillDataResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.skillDataResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); + }); + } + + /** Browse available skills in the mesh. */ + async listSkills(query?: string): Promise> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.skillListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.skillListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); + }); + } + + /** Remove a skill you published. */ + async removeSkill(name: string): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.skillAckResolvers.set(reqId, { resolve: (result) => { + resolve(result?.action === "removed"); + }, timer: setTimeout(() => { + if (this.skillAckResolvers.delete(reqId)) resolve(false); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "remove_skill", name, _reqId: reqId })); + }); + } + close(): void { this.closed = true; + this.stopStatsReporting(); if (this.helloTimer) clearTimeout(this.helloTimer); if (this.reconnectTimer) clearTimeout(this.reconnectTimer); if (this.ws) { @@ -1013,6 +1130,7 @@ export class BrokerClient { return; } if (msg.type === "push") { + this._statsCounters.messagesIn++; const nonce = String(msg.nonce ?? ""); const ciphertext = String(msg.ciphertext ?? ""); const senderPubkey = String(msg.senderPubkey ?? ""); @@ -1234,6 +1352,16 @@ export class BrokerClient { } return; } + if (msg.type === "clock_status") { + this.resolveFromMap(this.clockStatusResolvers, msgReqId, { + speed: Number(msg.speed ?? 0), + paused: Boolean(msg.paused), + tick: Number(msg.tick ?? 0), + simTime: String(msg.simTime ?? ""), + startedAt: String(msg.startedAt ?? ""), + }); + return; + } if (msg.type === "mesh_info_result") { this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record); return; @@ -1337,6 +1465,7 @@ export class BrokerClient { [this.streamCreatedResolvers, null], [this.listPeersResolvers, []], [this.meshInfoResolvers, null], + [this.clockStatusResolvers, null], [this.mcpRegisterResolvers, null], [this.mcpListResolvers, []], [this.mcpCallResolvers, { error: "broker error" }],