diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 34dbfcf..dccb26a 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -65,6 +65,10 @@ import { meshSchema, createStream, listStreams, + shareSkill, + getSkill, + listSkills, + removeSkill, } from "./broker"; import { ensureBucket, meshBucketName, minioClient } from "./minio"; import { qdrant, meshCollectionName, ensureCollection } from "./qdrant"; @@ -99,6 +103,20 @@ interface PeerConn { channel?: string; model?: string; groups: Array<{ name: string; role?: string }>; + stats?: { + messagesIn?: number; + messagesOut?: number; + toolCalls?: number; + uptime?: number; + errors?: number; + }; + visible: boolean; + profile: { + avatar?: string; + title?: string; + bio?: string; + capabilities?: string[]; + }; } const connections = new Map(); @@ -107,6 +125,24 @@ const connectionsPerMesh = new Map(); // Stream subscriptions: "meshId:streamName" → Set of presenceIds const streamSubscriptions = new Map>(); +// --- MCP proxy registry (in-memory, ephemeral) --- +interface McpRegisteredServer { + meshId: string; + presenceId: string; + serverName: string; + description: string; + tools: Array<{ name: string; description: string; inputSchema: Record }>; + hostedByName: string; +} +/** Keyed by "meshId:serverName" */ +const mcpRegistry = new Map(); + +/** Pending MCP call forwards: callId → { resolve, timer } */ +const mcpCallResolvers = new Map void; + timer: ReturnType; +}>(); + /// Scheduled messages: meshId → Map interface ScheduledEntry { id: string; @@ -770,6 +806,8 @@ async function handleHello( channel: hello.channel, model: hello.model, groups: initialGroups, + visible: true, + profile: {}, }); incMeshCount(hello.meshId); log.info("ws hello", { @@ -969,6 +1007,7 @@ function handleConnection(ws: WebSocket): void { ...(pc?.peerType ? { peerType: pc.peerType } : {}), ...(pc?.channel ? { channel: pc.channel } : {}), ...(pc?.model ? { model: pc.model } : {}), + ...(pc?.stats ? { stats: pc.stats } : {}), }; }), ...(_reqId ? { _reqId } : {}), @@ -990,6 +1029,15 @@ function handleConnection(ws: WebSocket): void { }); break; } + case "set_stats": { + const sm = msg as Extract; + conn.stats = sm.stats ?? {}; + log.info("ws set_stats", { + presence_id: presenceId, + stats: conn.stats, + }); + break; + } case "join_group": { const jg = msg as Extract; const updatedGroups = await joinGroup(presenceId, jg.name, jg.role); @@ -2138,6 +2186,142 @@ function handleConnection(ws: WebSocket): void { log.info("ws cancel_scheduled", { presence_id: presenceId, scheduled_id: cs.scheduledId, ok }); break; } + + // --- MCP proxy --- + case "mcp_register": { + const mr = msg as Extract; + const regKey = `${conn.meshId}:${mr.serverName}`; + mcpRegistry.set(regKey, { + meshId: conn.meshId, + presenceId: presenceId, + serverName: mr.serverName, + description: mr.description, + tools: mr.tools, + hostedByName: conn.displayName, + }); + sendToPeer(presenceId, { + type: "mcp_register_ack", + serverName: mr.serverName, + toolCount: mr.tools.length, + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws mcp_register", { + presence_id: presenceId, + server: mr.serverName, + tools: mr.tools.length, + }); + break; + } + case "mcp_unregister": { + const mu = msg as Extract; + const unregKey = `${conn.meshId}:${mu.serverName}`; + const entry = mcpRegistry.get(unregKey); + if (entry && entry.presenceId === presenceId) { + mcpRegistry.delete(unregKey); + } + log.info("ws mcp_unregister", { + presence_id: presenceId, + server: mu.serverName, + }); + break; + } + case "mcp_list": { + const servers: Array<{ + name: string; + description: string; + hostedBy: string; + tools: Array<{ name: string; description: string }>; + }> = []; + for (const [, entry] of mcpRegistry) { + if (entry.meshId !== conn.meshId) continue; + servers.push({ + name: entry.serverName, + description: entry.description, + hostedBy: entry.hostedByName, + tools: entry.tools.map((t) => ({ name: t.name, description: t.description })), + }); + } + sendToPeer(presenceId, { + type: "mcp_list_result", + servers, + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws mcp_list", { + presence_id: presenceId, + count: servers.length, + }); + break; + } + case "mcp_call": { + const mc = msg as Extract; + const callKey = `${conn.meshId}:${mc.serverName}`; + const server = mcpRegistry.get(callKey); + if (!server) { + sendToPeer(presenceId, { + type: "mcp_call_result", + error: `MCP server "${mc.serverName}" not found in mesh`, + ...(_reqId ? { _reqId } : {}), + }); + break; + } + // Check hosting peer is still connected + const hostConn = connections.get(server.presenceId); + if (!hostConn) { + mcpRegistry.delete(callKey); + sendToPeer(presenceId, { + type: "mcp_call_result", + error: `MCP server "${mc.serverName}" host disconnected`, + ...(_reqId ? { _reqId } : {}), + }); + break; + } + // Forward the call to the hosting peer + const callId = crypto.randomUUID(); + const callPromise = new Promise<{ result?: unknown; error?: string }>((resolve) => { + const timer = setTimeout(() => { + if (mcpCallResolvers.delete(callId)) { + resolve({ error: "MCP call timed out (30s)" }); + } + }, 30_000); + mcpCallResolvers.set(callId, { resolve, timer }); + }); + sendToPeer(server.presenceId, { + type: "mcp_call_forward", + callId, + serverName: mc.serverName, + toolName: mc.toolName, + args: mc.args, + callerName: conn.displayName, + }); + // Wait for response from hosting peer + const callResult = await callPromise; + sendToPeer(presenceId, { + type: "mcp_call_result", + ...(callResult.result !== undefined ? { result: callResult.result } : {}), + ...(callResult.error ? { error: callResult.error } : {}), + ...(_reqId ? { _reqId } : {}), + }); + log.info("ws mcp_call", { + presence_id: presenceId, + server: mc.serverName, + tool: mc.toolName, + ok: !callResult.error, + }); + break; + } + case "mcp_call_response": { + const mcr = msg as Extract; + const resolver = mcpCallResolvers.get(mcr.callId); + if (resolver) { + clearTimeout(resolver.timer); + mcpCallResolvers.delete(mcr.callId); + resolver.resolve({ + ...(mcr.result !== undefined ? { result: mcr.result } : {}), + ...(mcr.error ? { error: mcr.error } : {}), + }); + } + break; + } } } catch (e) { metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); @@ -2181,6 +2365,10 @@ function handleConnection(ws: WebSocket): void { subs.delete(presenceId); if (subs.size === 0) streamSubscriptions.delete(key); } + // Clean up MCP servers registered by this peer + for (const [key, entry] of mcpRegistry) { + if (entry.presenceId === presenceId) mcpRegistry.delete(key); + } log.info("ws close", { presence_id: presenceId }); } }); diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 02103de..932f0e1 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -118,6 +118,36 @@ export interface WSSetSummaryMessage { summary: string; } + +/** Client → broker: toggle visibility in the mesh. */ +export interface WSSetVisibleMessage { + type: "set_visible"; + visible: boolean; + _reqId?: string; +} + +/** Client → broker: set public profile metadata. */ +export interface WSSetProfileMessage { + type: "set_profile"; + avatar?: string; // emoji or URL + title?: string; // short role label + bio?: string; // one-liner + capabilities?: string[]; // what I can help with + _reqId?: string; +} +/** Client → broker: self-report resource usage stats. */ +export interface WSSetStatsMessage { + type: "set_stats"; + stats: { + messagesIn?: number; + messagesOut?: number; + toolCalls?: number; + uptime?: number; // seconds since session start + errors?: number; + }; + _reqId?: string; +} + /** Client → broker: join a group with optional role. */ export interface WSJoinGroupMessage { type: "join_group"; @@ -199,6 +229,20 @@ export interface WSPeersListMessage { peerType?: "ai" | "human" | "connector"; channel?: string; model?: string; + stats?: { + messagesIn?: number; + messagesOut?: number; + toolCalls?: number; + uptime?: number; + errors?: number; + }; + visible?: boolean; + profile?: { + avatar?: string; + title?: string; + bio?: string; + capabilities?: string[]; + }; }>; _reqId?: string; } @@ -672,6 +716,124 @@ export interface WSStreamListMessage { _reqId?: string; } +// --- MCP proxy messages --- + +/** Client → broker: register an MCP server with the mesh. */ +export interface WSMcpRegisterMessage { + type: "mcp_register"; + serverName: string; + description: string; + tools: Array<{ name: string; description: string; inputSchema: Record }>; + _reqId?: string; +} + +/** Client → broker: unregister an MCP server. */ +export interface WSMcpUnregisterMessage { + type: "mcp_unregister"; + serverName: string; + _reqId?: string; +} + +/** Client → broker: list all MCP servers in the mesh. */ +export interface WSMcpListMessage { + type: "mcp_list"; + _reqId?: string; +} + +/** Client → broker: call a tool on a mesh-registered MCP server. */ +export interface WSMcpCallMessage { + type: "mcp_call"; + serverName: string; + toolName: string; + args: Record; + _reqId?: string; +} + +/** Client → broker: response to a forwarded MCP call. */ +export interface WSMcpCallResponseMessage { + type: "mcp_call_response"; + callId: string; + result?: unknown; + error?: string; + _reqId?: string; +} + +/** Broker → client: acknowledgement for mcp_register. */ +export interface WSMcpRegisterAckMessage { + type: "mcp_register_ack"; + serverName: string; + toolCount: number; + _reqId?: string; +} + +/** Broker → client: list of MCP servers in the mesh. */ +export interface WSMcpListResultMessage { + type: "mcp_list_result"; + servers: Array<{ + name: string; + description: string; + hostedBy: string; + tools: Array<{ name: string; description: string }>; + }>; + _reqId?: string; +} + +/** Broker → client: result of an MCP tool call. */ +export interface WSMcpCallResultMessage { + type: "mcp_call_result"; + result?: unknown; + error?: string; + _reqId?: string; +} + +/** Broker → client: forwarded MCP tool call to execute locally. */ +export interface WSMcpCallForwardMessage { + type: "mcp_call_forward"; + callId: string; + serverName: string; + toolName: string; + args: Record; + callerName: string; +} + +// --- Webhook CRUD messages --- + +/** Client → broker: create an inbound webhook. */ +export interface WSCreateWebhookMessage { + type: "create_webhook"; + name: string; + _reqId?: string; +} + +/** Client → broker: list webhooks for the mesh. */ +export interface WSListWebhooksMessage { + type: "list_webhooks"; + _reqId?: string; +} + +/** Client → broker: deactivate a webhook. */ +export interface WSDeleteWebhookMessage { + type: "delete_webhook"; + name: string; + _reqId?: string; +} + +/** Broker → client: acknowledgement for create_webhook. */ +export interface WSWebhookAckMessage { + type: "webhook_ack"; + name: string; + url: string; + secret: string; + _reqId?: string; +} + +/** Broker → client: list of webhooks for the mesh. */ +export interface WSWebhookListMessage { + type: "webhook_list"; + webhooks: Array<{ name: string; url: string; active: boolean; createdAt: string }>; + _reqId?: string; +} + /** Broker → client: structured error. */ export interface WSErrorMessage { type: "error"; @@ -681,6 +843,44 @@ export interface WSErrorMessage { _reqId?: string; } +// --- Simulation clock messages --- + +/** Client → broker: set the simulation clock speed. */ +export interface WSSetClockMessage { + type: "set_clock"; + speed: number; // multiplier: 1, 2, 5, 10, 50, 100 + _reqId?: string; +} + +/** Client → broker: pause the simulation clock. */ +export interface WSPauseClockMessage { + type: "pause_clock"; + _reqId?: string; +} + +/** Client → broker: resume a paused simulation clock. */ +export interface WSResumeClockMessage { + type: "resume_clock"; + _reqId?: string; +} + +/** Client → broker: get current clock status. */ +export interface WSGetClockMessage { + type: "get_clock"; + _reqId?: string; +} + +/** Broker → client: current simulation clock status. */ +export interface WSClockStatusMessage { + type: "clock_status"; + speed: number; + paused: boolean; + tick: number; + simTime: string; // ISO timestamp + startedAt: string; + _reqId?: string; +} + // --- Scheduled messages --- /** Client → broker: schedule a message for future delivery. */ @@ -753,6 +953,8 @@ export type WSClientMessage = | WSSetStatusMessage | WSListPeersMessage | WSSetSummaryMessage + | WSSetVisibleMessage + | WSSetProfileMessage | WSJoinGroupMessage | WSLeaveGroupMessage | WSSetStateMessage @@ -789,9 +991,94 @@ export type WSClientMessage = | WSUnsubscribeMessage | WSListStreamsMessage | WSMeshInfoMessage + | WSSetClockMessage + | WSPauseClockMessage + | WSResumeClockMessage + | WSGetClockMessage | WSScheduleMessage | WSListScheduledMessage - | WSCancelScheduledMessage; + | WSCancelScheduledMessage + | WSMcpRegisterMessage + | WSMcpUnregisterMessage + | WSMcpListMessage + | WSMcpCallMessage + | WSMcpCallResponseMessage + | WSShareSkillMessage + | WSGetSkillMessage + | WSListSkillsMessage + | WSRemoveSkillMessage + | WSSetStatsMessage + | WSCreateWebhookMessage + | WSListWebhooksMessage + | WSDeleteWebhookMessage; + +// --- Skill messages --- + +/** Client → broker: publish or update a skill. */ +export interface WSShareSkillMessage { + type: "share_skill"; + name: string; + description: string; + instructions: string; + tags?: string[]; + _reqId?: string; +} + +/** Client → broker: load a skill by name. */ +export interface WSGetSkillMessage { + type: "get_skill"; + name: string; + _reqId?: string; +} + +/** Client → broker: list skills, optionally filtered by keyword. */ +export interface WSListSkillsMessage { + type: "list_skills"; + query?: string; + _reqId?: string; +} + +/** Client → broker: remove a skill by name. */ +export interface WSRemoveSkillMessage { + type: "remove_skill"; + name: string; + _reqId?: string; +} + +/** Broker → client: acknowledgement for share_skill or remove_skill. */ +export interface WSSkillAckMessage { + type: "skill_ack"; + name: string; + action: "shared" | "removed" | "not_found"; + _reqId?: string; +} + +/** Broker → client: response to get_skill with full skill data. */ +export interface WSSkillDataMessage { + type: "skill_data"; + skill: { + name: string; + description: string; + instructions: string; + tags: string[]; + author: string; + createdAt: string; + } | null; + _reqId?: string; +} + +/** Broker → client: response to list_skills. */ +export interface WSSkillListMessage { + type: "skill_list"; + skills: Array<{ + name: string; + description: string; + tags: string[]; + author: string; + createdAt: string; + }>; + _reqId?: string; +} export type WSServerMessage = | WSHelloAckMessage @@ -827,4 +1114,14 @@ export type WSServerMessage = | WSScheduledAckMessage | WSScheduledListMessage | WSCancelScheduledAckMessage + | WSMcpRegisterAckMessage + | WSMcpListResultMessage + | WSMcpCallResultMessage + | WSMcpCallForwardMessage + | WSClockStatusMessage + | WSSkillAckMessage + | WSSkillDataMessage + | WSSkillListMessage + | WSWebhookAckMessage + | WSWebhookListMessage | WSErrorMessage; diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 78f5ef9..6ffc0c8 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -194,6 +194,10 @@ If the channel meta contains \`subtype: reminder\`, this is a scheduled reminder | schedule_reminder(message, in_seconds?, deliver_at?, to?) | Schedule a reminder to yourself (no \`to\`) or a delayed message to a peer/group. Delivered as a push with \`subtype: reminder\` in the channel meta. | | list_scheduled() | List pending scheduled reminders and messages. | | cancel_scheduled(id) | Cancel a pending scheduled item. | +| mesh_mcp_register(server_name, description, tools) | Register an MCP server with the mesh. Other peers can call its tools. | +| mesh_mcp_list() | List MCP servers available in the mesh with their tools. | +| mesh_tool_call(server_name, tool_name, args?) | Call a tool on a mesh-registered MCP server (30s timeout). | +| mesh_mcp_remove(server_name) | Unregister an MCP server you registered. | If multiple meshes are joined, prefix \`to\` with \`:\` to disambiguate (e.g. \`dev-team:Alice\`). @@ -959,6 +963,55 @@ Your message mode is "${messageMode}". return text(results.join("\n")); } + // --- MCP Proxy --- + case "mesh_mcp_register": { + const { server_name, description, tools: regTools } = (args ?? {}) as { + server_name?: string; + description?: string; + tools?: Array<{ name: string; description: string; inputSchema: Record }>; + }; + if (!server_name || !description || !regTools?.length) + return text("mesh_mcp_register: `server_name`, `description`, and `tools` required", true); + const client = allClients()[0]; + if (!client) return text("mesh_mcp_register: not connected", true); + const result = await client.mcpRegister(server_name, description, regTools); + if (!result) return text("mesh_mcp_register: broker did not acknowledge", true); + return text(`Registered MCP server "${result.serverName}" with ${result.toolCount} tool(s). Other peers can now call its tools via mesh_tool_call.`); + } + case "mesh_mcp_list": { + const client = allClients()[0]; + if (!client) return text("mesh_mcp_list: not connected", true); + const servers = await client.mcpList(); + if (servers.length === 0) return text("No MCP servers registered in the mesh."); + const lines = servers.map((s) => { + const toolList = s.tools.map((t) => ` - **${t.name}**: ${t.description}`).join("\n"); + return `- **${s.name}** (hosted by ${s.hostedBy}): ${s.description}\n${toolList}`; + }); + return text(`${servers.length} MCP server(s) in mesh:\n${lines.join("\n")}`); + } + case "mesh_tool_call": { + const { server_name: callServer, tool_name: callTool, args: callArgs } = (args ?? {}) as { + server_name?: string; + tool_name?: string; + args?: Record; + }; + if (!callServer || !callTool) + return text("mesh_tool_call: `server_name` and `tool_name` required", true); + const client = allClients()[0]; + if (!client) return text("mesh_tool_call: not connected", true); + const callResult = await client.mcpCall(callServer, callTool, callArgs ?? {}); + if (callResult.error) return text(`mesh_tool_call error: ${callResult.error}`, true); + return text(typeof callResult.result === "string" ? callResult.result : JSON.stringify(callResult.result, null, 2)); + } + case "mesh_mcp_remove": { + const { server_name: rmServer } = (args ?? {}) as { server_name?: string }; + if (!rmServer) return text("mesh_mcp_remove: `server_name` required", true); + const client = allClients()[0]; + if (!client) return text("mesh_mcp_remove: not connected", true); + await client.mcpUnregister(rmServer); + return text(`Unregistered MCP server "${rmServer}" from the mesh.`); + } + case "grant_file_access": { const { fileId, to: grantTo } = (args ?? {}) as { fileId?: string; to?: string }; if (!fileId || !grantTo) return text("grant_file_access: `fileId` and `to` required", true); diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index 8370eef..cffb039 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -609,6 +609,66 @@ export const TOOLS: Tool[] = [ inputSchema: { type: "object", properties: {} }, }, + // --- MCP Proxy --- + { + name: "mesh_mcp_register", + description: + "Register an MCP server with the mesh. Other peers can invoke its tools through the mesh without restarting their sessions. Provide the server name, description, and full tool definitions.", + inputSchema: { + type: "object", + properties: { + server_name: { type: "string", description: "Unique name for the MCP server (e.g. 'github', 'jira')" }, + description: { type: "string", description: "What this MCP server does" }, + tools: { + type: "array", + items: { + type: "object", + properties: { + name: { type: "string" }, + description: { type: "string" }, + inputSchema: { type: "object", description: "JSON Schema for tool arguments" }, + }, + required: ["name", "description", "inputSchema"], + }, + description: "Tool definitions to expose", + }, + }, + required: ["server_name", "description", "tools"], + }, + }, + { + name: "mesh_mcp_list", + description: + "List MCP servers available in the mesh with their tools. Shows which peer hosts each server.", + inputSchema: { type: "object", properties: {} }, + }, + { + name: "mesh_tool_call", + description: + "Call a tool on a mesh-registered MCP server. Route: you -> broker -> hosting peer -> execute -> result back. Timeout: 30s.", + inputSchema: { + type: "object", + properties: { + server_name: { type: "string", description: "Name of the MCP server" }, + tool_name: { type: "string", description: "Name of the tool to call" }, + args: { type: "object", description: "Tool arguments (JSON object)" }, + }, + required: ["server_name", "tool_name"], + }, + }, + { + name: "mesh_mcp_remove", + description: + "Unregister an MCP server you previously registered with the mesh.", + inputSchema: { + type: "object", + properties: { + server_name: { type: "string", description: "Name of the MCP server to remove" }, + }, + required: ["server_name"], + }, + }, + // --- Diagnostics --- { name: "ping_mesh", diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 699fca2..0b8d594 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -38,6 +38,13 @@ export interface PeerInfo { peerType?: "ai" | "human" | "connector"; channel?: string; model?: string; + stats?: { + messagesIn?: number; + messagesOut?: number; + toolCalls?: number; + uptime?: number; + errors?: number; + }; } export interface InboundPush { @@ -100,6 +107,16 @@ export class BrokerClient { private helloTimer: NodeJS.Timeout | null = null; private reconnectTimer: NodeJS.Timeout | null = null; + // --- Stats counters --- + private _statsCounters = { + messagesIn: 0, + messagesOut: 0, + toolCalls: 0, + errors: 0, + }; + private _sessionStartedAt = Date.now(); + private _statsReportTimer: NodeJS.Timeout | null = null; + constructor( private mesh: JoinedMesh, private opts: { @@ -337,6 +354,42 @@ export class BrokerClient { this.ws.send(JSON.stringify({ type: "set_summary", summary })); } + /** Report resource usage stats to the broker. */ + setStats(stats?: Record): void { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + const payload = stats ?? { + ...this._statsCounters, + uptime: Math.round((Date.now() - this._sessionStartedAt) / 1000), + }; + this.ws.send(JSON.stringify({ type: "set_stats", stats: payload })); + } + + /** Increment the tool call counter. */ + incrementToolCalls(): void { + this._statsCounters.toolCalls++; + } + + /** Increment the error counter. */ + incrementErrors(): void { + this._statsCounters.errors++; + } + + /** Start auto-reporting stats every 60 seconds. */ + startStatsReporting(): void { + if (this._statsReportTimer) return; + this._statsReportTimer = setInterval(() => { + this.setStats(); + }, 60_000); + } + + /** Stop auto-reporting stats. */ + stopStatsReporting(): void { + if (this._statsReportTimer) { + clearInterval(this._statsReportTimer); + this._statsReportTimer = null; + } + } + /** Join a group with an optional role. */ async joinGroup(name: string, role?: string): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; @@ -486,6 +539,11 @@ export class BrokerClient { private scheduledAckResolvers = new Map void; timer: NodeJS.Timeout }>(); private scheduledListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); private cancelScheduledResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpRegisterResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpListResolvers = new Map }>) => void; timer: NodeJS.Timeout }>(); + private mcpCallResolvers = new Map void; timer: NodeJS.Timeout }>(); + /** Handler for inbound mcp_call_forward messages. Set by the MCP server. */ + private mcpCallForwardHandler: ((forward: { callId: string; serverName: string; toolName: string; args: Record; callerName: string }) => Promise<{ result?: unknown; error?: string }>) | null = null; async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; @@ -822,6 +880,65 @@ export class BrokerClient { return () => this.stateChangeHandlers.delete(handler); } + // --- MCP proxy --- + + /** Register an MCP server with the mesh. */ + async mcpRegister( + serverName: string, + description: string, + tools: Array<{ name: string; description: string; inputSchema: Record }>, + ): Promise<{ serverName: string; toolCount: number } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.mcpRegisterResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.mcpRegisterResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "mcp_register", serverName, description, tools, _reqId: reqId })); + }); + } + + /** Unregister an MCP server from the mesh. */ + async mcpUnregister(serverName: string): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "mcp_unregister", serverName })); + } + + /** List MCP servers available in the mesh. */ + async mcpList(): Promise }>> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.mcpListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.mcpListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "mcp_list", _reqId: reqId })); + }); + } + + /** Call a tool on a mesh-registered MCP server. 30s timeout. */ + async mcpCall(serverName: string, toolName: string, args: Record): Promise<{ result?: unknown; error?: string }> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return { error: "not connected" }; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.mcpCallResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.mcpCallResolvers.delete(reqId)) resolve({ error: "MCP call timed out (30s)" }); + }, 30_000) }); + this.ws!.send(JSON.stringify({ type: "mcp_call", serverName, toolName, args, _reqId: reqId })); + }); + } + + /** Set the handler for inbound forwarded MCP calls. */ + onMcpCallForward(handler: (forward: { callId: string; serverName: string; toolName: string; args: Record; callerName: string }) => Promise<{ result?: unknown; error?: string }>): void { + this.mcpCallForwardHandler = handler; + } + + /** Send a response to a forwarded MCP call back to the broker. */ + private sendMcpCallResponse(callId: string, result?: unknown, error?: string): void { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "mcp_call_response", callId, result, error })); + } + // --- Mesh info --- private meshInfoResolvers = new Map | null) => void; timer: NodeJS.Timeout }>(); @@ -1138,6 +1255,42 @@ export class BrokerClient { this.resolveFromMap(this.cancelScheduledResolvers, msgReqId, Boolean(msg.ok)); return; } + if (msg.type === "mcp_register_ack") { + this.resolveFromMap(this.mcpRegisterResolvers, msgReqId, { + serverName: String(msg.serverName ?? ""), + toolCount: Number(msg.toolCount ?? 0), + }); + return; + } + if (msg.type === "mcp_list_result") { + const servers = (msg.servers as Array<{ name: string; description: string; hostedBy: string; tools: Array<{ name: string; description: string }> }>) ?? []; + this.resolveFromMap(this.mcpListResolvers, msgReqId, servers); + return; + } + if (msg.type === "mcp_call_result") { + this.resolveFromMap(this.mcpCallResolvers, msgReqId, { + ...(msg.result !== undefined ? { result: msg.result } : {}), + ...(msg.error ? { error: String(msg.error) } : {}), + }); + return; + } + if (msg.type === "mcp_call_forward") { + const forward = { + callId: String(msg.callId ?? ""), + serverName: String(msg.serverName ?? ""), + toolName: String(msg.toolName ?? ""), + args: (msg.args as Record) ?? {}, + callerName: String(msg.callerName ?? ""), + }; + if (this.mcpCallForwardHandler) { + this.mcpCallForwardHandler(forward) + .then((res) => this.sendMcpCallResponse(forward.callId, res.result, res.error)) + .catch((e) => this.sendMcpCallResponse(forward.callId, undefined, e instanceof Error ? e.message : String(e))); + } else { + this.sendMcpCallResponse(forward.callId, undefined, "No MCP call handler registered on this peer"); + } + return; + } if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null; @@ -1184,6 +1337,9 @@ export class BrokerClient { [this.streamCreatedResolvers, null], [this.listPeersResolvers, []], [this.meshInfoResolvers, null], + [this.mcpRegisterResolvers, null], + [this.mcpListResolvers, []], + [this.mcpCallResolvers, { error: "broker error" }], ]; for (const [map, defaultVal] of allMaps) { const first = (map as Map).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;