feat: implement mesh MCP proxy — dynamic tool sharing between peers

Peers can register MCP servers with the mesh and other peers can invoke
those tools through the existing claudemesh connection without restarting.

Broker: in-memory MCP registry with mcp_register/unregister/list/call
handlers, call forwarding to hosting peer with 30s timeout, and automatic
cleanup on peer disconnect.

CLI: mcpRegister/mcpUnregister/mcpList/mcpCall client methods, inbound
mcp_call_forward handler, and 4 new MCP tools (mesh_mcp_register,
mesh_mcp_list, mesh_tool_call, mesh_mcp_remove).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 23:50:54 +01:00
parent 7d432b3aaa
commit 08e289a5e3
5 changed files with 755 additions and 1 deletions

View File

@@ -65,6 +65,10 @@ import {
meshSchema,
createStream,
listStreams,
shareSkill,
getSkill,
listSkills,
removeSkill,
} from "./broker";
import { ensureBucket, meshBucketName, minioClient } from "./minio";
import { qdrant, meshCollectionName, ensureCollection } from "./qdrant";
@@ -99,6 +103,20 @@ interface PeerConn {
channel?: string;
model?: string;
groups: Array<{ name: string; role?: string }>;
stats?: {
messagesIn?: number;
messagesOut?: number;
toolCalls?: number;
uptime?: number;
errors?: number;
};
visible: boolean;
profile: {
avatar?: string;
title?: string;
bio?: string;
capabilities?: string[];
};
}
const connections = new Map<string, PeerConn>();
@@ -107,6 +125,24 @@ const connectionsPerMesh = new Map<string, number>();
// Stream subscriptions: "meshId:streamName" → Set of presenceIds
const streamSubscriptions = new Map<string, Set<string>>();
// --- MCP proxy registry (in-memory, ephemeral) ---
interface McpRegisteredServer {
meshId: string;
presenceId: string;
serverName: string;
description: string;
tools: Array<{ name: string; description: string; inputSchema: Record<string, unknown> }>;
hostedByName: string;
}
/** Keyed by "meshId:serverName" */
const mcpRegistry = new Map<string, McpRegisteredServer>();
/** Pending MCP call forwards: callId → { resolve, timer } */
const mcpCallResolvers = new Map<string, {
resolve: (result: { result?: unknown; error?: string }) => void;
timer: ReturnType<typeof setTimeout>;
}>();
/// Scheduled messages: meshId → Map<scheduledId, entry>
interface ScheduledEntry {
id: string;
@@ -770,6 +806,8 @@ async function handleHello(
channel: hello.channel,
model: hello.model,
groups: initialGroups,
visible: true,
profile: {},
});
incMeshCount(hello.meshId);
log.info("ws hello", {
@@ -969,6 +1007,7 @@ function handleConnection(ws: WebSocket): void {
...(pc?.peerType ? { peerType: pc.peerType } : {}),
...(pc?.channel ? { channel: pc.channel } : {}),
...(pc?.model ? { model: pc.model } : {}),
...(pc?.stats ? { stats: pc.stats } : {}),
};
}),
...(_reqId ? { _reqId } : {}),
@@ -990,6 +1029,15 @@ function handleConnection(ws: WebSocket): void {
});
break;
}
case "set_stats": {
const sm = msg as Extract<WSClientMessage, { type: "set_stats" }>;
conn.stats = sm.stats ?? {};
log.info("ws set_stats", {
presence_id: presenceId,
stats: conn.stats,
});
break;
}
case "join_group": {
const jg = msg as Extract<WSClientMessage, { type: "join_group" }>;
const updatedGroups = await joinGroup(presenceId, jg.name, jg.role);
@@ -2138,6 +2186,142 @@ function handleConnection(ws: WebSocket): void {
log.info("ws cancel_scheduled", { presence_id: presenceId, scheduled_id: cs.scheduledId, ok });
break;
}
// --- MCP proxy ---
case "mcp_register": {
const mr = msg as Extract<WSClientMessage, { type: "mcp_register" }>;
const regKey = `${conn.meshId}:${mr.serverName}`;
mcpRegistry.set(regKey, {
meshId: conn.meshId,
presenceId: presenceId,
serverName: mr.serverName,
description: mr.description,
tools: mr.tools,
hostedByName: conn.displayName,
});
sendToPeer(presenceId, {
type: "mcp_register_ack",
serverName: mr.serverName,
toolCount: mr.tools.length,
...(_reqId ? { _reqId } : {}),
});
log.info("ws mcp_register", {
presence_id: presenceId,
server: mr.serverName,
tools: mr.tools.length,
});
break;
}
case "mcp_unregister": {
const mu = msg as Extract<WSClientMessage, { type: "mcp_unregister" }>;
const unregKey = `${conn.meshId}:${mu.serverName}`;
const entry = mcpRegistry.get(unregKey);
if (entry && entry.presenceId === presenceId) {
mcpRegistry.delete(unregKey);
}
log.info("ws mcp_unregister", {
presence_id: presenceId,
server: mu.serverName,
});
break;
}
case "mcp_list": {
const servers: Array<{
name: string;
description: string;
hostedBy: string;
tools: Array<{ name: string; description: string }>;
}> = [];
for (const [, entry] of mcpRegistry) {
if (entry.meshId !== conn.meshId) continue;
servers.push({
name: entry.serverName,
description: entry.description,
hostedBy: entry.hostedByName,
tools: entry.tools.map((t) => ({ name: t.name, description: t.description })),
});
}
sendToPeer(presenceId, {
type: "mcp_list_result",
servers,
...(_reqId ? { _reqId } : {}),
});
log.info("ws mcp_list", {
presence_id: presenceId,
count: servers.length,
});
break;
}
case "mcp_call": {
const mc = msg as Extract<WSClientMessage, { type: "mcp_call" }>;
const callKey = `${conn.meshId}:${mc.serverName}`;
const server = mcpRegistry.get(callKey);
if (!server) {
sendToPeer(presenceId, {
type: "mcp_call_result",
error: `MCP server "${mc.serverName}" not found in mesh`,
...(_reqId ? { _reqId } : {}),
});
break;
}
// Check hosting peer is still connected
const hostConn = connections.get(server.presenceId);
if (!hostConn) {
mcpRegistry.delete(callKey);
sendToPeer(presenceId, {
type: "mcp_call_result",
error: `MCP server "${mc.serverName}" host disconnected`,
...(_reqId ? { _reqId } : {}),
});
break;
}
// Forward the call to the hosting peer
const callId = crypto.randomUUID();
const callPromise = new Promise<{ result?: unknown; error?: string }>((resolve) => {
const timer = setTimeout(() => {
if (mcpCallResolvers.delete(callId)) {
resolve({ error: "MCP call timed out (30s)" });
}
}, 30_000);
mcpCallResolvers.set(callId, { resolve, timer });
});
sendToPeer(server.presenceId, {
type: "mcp_call_forward",
callId,
serverName: mc.serverName,
toolName: mc.toolName,
args: mc.args,
callerName: conn.displayName,
});
// Wait for response from hosting peer
const callResult = await callPromise;
sendToPeer(presenceId, {
type: "mcp_call_result",
...(callResult.result !== undefined ? { result: callResult.result } : {}),
...(callResult.error ? { error: callResult.error } : {}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws mcp_call", {
presence_id: presenceId,
server: mc.serverName,
tool: mc.toolName,
ok: !callResult.error,
});
break;
}
case "mcp_call_response": {
const mcr = msg as Extract<WSClientMessage, { type: "mcp_call_response" }>;
const resolver = mcpCallResolvers.get(mcr.callId);
if (resolver) {
clearTimeout(resolver.timer);
mcpCallResolvers.delete(mcr.callId);
resolver.resolve({
...(mcr.result !== undefined ? { result: mcr.result } : {}),
...(mcr.error ? { error: mcr.error } : {}),
});
}
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
@@ -2181,6 +2365,10 @@ function handleConnection(ws: WebSocket): void {
subs.delete(presenceId);
if (subs.size === 0) streamSubscriptions.delete(key);
}
// Clean up MCP servers registered by this peer
for (const [key, entry] of mcpRegistry) {
if (entry.presenceId === presenceId) mcpRegistry.delete(key);
}
log.info("ws close", { presence_id: presenceId });
}
});

View File

@@ -118,6 +118,36 @@ export interface WSSetSummaryMessage {
summary: string;
}
/** Client → broker: toggle visibility in the mesh. */
export interface WSSetVisibleMessage {
type: "set_visible";
visible: boolean;
_reqId?: string;
}
/** Client → broker: set public profile metadata. */
export interface WSSetProfileMessage {
type: "set_profile";
avatar?: string; // emoji or URL
title?: string; // short role label
bio?: string; // one-liner
capabilities?: string[]; // what I can help with
_reqId?: string;
}
/** Client → broker: self-report resource usage stats. */
export interface WSSetStatsMessage {
type: "set_stats";
stats: {
messagesIn?: number;
messagesOut?: number;
toolCalls?: number;
uptime?: number; // seconds since session start
errors?: number;
};
_reqId?: string;
}
/** Client → broker: join a group with optional role. */
export interface WSJoinGroupMessage {
type: "join_group";
@@ -199,6 +229,20 @@ export interface WSPeersListMessage {
peerType?: "ai" | "human" | "connector";
channel?: string;
model?: string;
stats?: {
messagesIn?: number;
messagesOut?: number;
toolCalls?: number;
uptime?: number;
errors?: number;
};
visible?: boolean;
profile?: {
avatar?: string;
title?: string;
bio?: string;
capabilities?: string[];
};
}>;
_reqId?: string;
}
@@ -672,6 +716,124 @@ export interface WSStreamListMessage {
_reqId?: string;
}
// --- MCP proxy messages ---
/** Client → broker: register an MCP server with the mesh. */
export interface WSMcpRegisterMessage {
type: "mcp_register";
serverName: string;
description: string;
tools: Array<{ name: string; description: string; inputSchema: Record<string, unknown> }>;
_reqId?: string;
}
/** Client → broker: unregister an MCP server. */
export interface WSMcpUnregisterMessage {
type: "mcp_unregister";
serverName: string;
_reqId?: string;
}
/** Client → broker: list all MCP servers in the mesh. */
export interface WSMcpListMessage {
type: "mcp_list";
_reqId?: string;
}
/** Client → broker: call a tool on a mesh-registered MCP server. */
export interface WSMcpCallMessage {
type: "mcp_call";
serverName: string;
toolName: string;
args: Record<string, unknown>;
_reqId?: string;
}
/** Client → broker: response to a forwarded MCP call. */
export interface WSMcpCallResponseMessage {
type: "mcp_call_response";
callId: string;
result?: unknown;
error?: string;
_reqId?: string;
}
/** Broker → client: acknowledgement for mcp_register. */
export interface WSMcpRegisterAckMessage {
type: "mcp_register_ack";
serverName: string;
toolCount: number;
_reqId?: string;
}
/** Broker → client: list of MCP servers in the mesh. */
export interface WSMcpListResultMessage {
type: "mcp_list_result";
servers: Array<{
name: string;
description: string;
hostedBy: string;
tools: Array<{ name: string; description: string }>;
}>;
_reqId?: string;
}
/** Broker → client: result of an MCP tool call. */
export interface WSMcpCallResultMessage {
type: "mcp_call_result";
result?: unknown;
error?: string;
_reqId?: string;
}
/** Broker → client: forwarded MCP tool call to execute locally. */
export interface WSMcpCallForwardMessage {
type: "mcp_call_forward";
callId: string;
serverName: string;
toolName: string;
args: Record<string, unknown>;
callerName: string;
}
// --- Webhook CRUD messages ---
/** Client → broker: create an inbound webhook. */
export interface WSCreateWebhookMessage {
type: "create_webhook";
name: string;
_reqId?: string;
}
/** Client → broker: list webhooks for the mesh. */
export interface WSListWebhooksMessage {
type: "list_webhooks";
_reqId?: string;
}
/** Client → broker: deactivate a webhook. */
export interface WSDeleteWebhookMessage {
type: "delete_webhook";
name: string;
_reqId?: string;
}
/** Broker → client: acknowledgement for create_webhook. */
export interface WSWebhookAckMessage {
type: "webhook_ack";
name: string;
url: string;
secret: string;
_reqId?: string;
}
/** Broker → client: list of webhooks for the mesh. */
export interface WSWebhookListMessage {
type: "webhook_list";
webhooks: Array<{ name: string; url: string; active: boolean; createdAt: string }>;
_reqId?: string;
}
/** Broker → client: structured error. */
export interface WSErrorMessage {
type: "error";
@@ -681,6 +843,44 @@ export interface WSErrorMessage {
_reqId?: string;
}
// --- Simulation clock messages ---
/** Client → broker: set the simulation clock speed. */
export interface WSSetClockMessage {
type: "set_clock";
speed: number; // multiplier: 1, 2, 5, 10, 50, 100
_reqId?: string;
}
/** Client → broker: pause the simulation clock. */
export interface WSPauseClockMessage {
type: "pause_clock";
_reqId?: string;
}
/** Client → broker: resume a paused simulation clock. */
export interface WSResumeClockMessage {
type: "resume_clock";
_reqId?: string;
}
/** Client → broker: get current clock status. */
export interface WSGetClockMessage {
type: "get_clock";
_reqId?: string;
}
/** Broker → client: current simulation clock status. */
export interface WSClockStatusMessage {
type: "clock_status";
speed: number;
paused: boolean;
tick: number;
simTime: string; // ISO timestamp
startedAt: string;
_reqId?: string;
}
// --- Scheduled messages ---
/** Client → broker: schedule a message for future delivery. */
@@ -753,6 +953,8 @@ export type WSClientMessage =
| WSSetStatusMessage
| WSListPeersMessage
| WSSetSummaryMessage
| WSSetVisibleMessage
| WSSetProfileMessage
| WSJoinGroupMessage
| WSLeaveGroupMessage
| WSSetStateMessage
@@ -789,9 +991,94 @@ export type WSClientMessage =
| WSUnsubscribeMessage
| WSListStreamsMessage
| WSMeshInfoMessage
| WSSetClockMessage
| WSPauseClockMessage
| WSResumeClockMessage
| WSGetClockMessage
| WSScheduleMessage
| WSListScheduledMessage
| WSCancelScheduledMessage;
| WSCancelScheduledMessage
| WSMcpRegisterMessage
| WSMcpUnregisterMessage
| WSMcpListMessage
| WSMcpCallMessage
| WSMcpCallResponseMessage
| WSShareSkillMessage
| WSGetSkillMessage
| WSListSkillsMessage
| WSRemoveSkillMessage
| WSSetStatsMessage
| WSCreateWebhookMessage
| WSListWebhooksMessage
| WSDeleteWebhookMessage;
// --- Skill messages ---
/** Client → broker: publish or update a skill. */
export interface WSShareSkillMessage {
type: "share_skill";
name: string;
description: string;
instructions: string;
tags?: string[];
_reqId?: string;
}
/** Client → broker: load a skill by name. */
export interface WSGetSkillMessage {
type: "get_skill";
name: string;
_reqId?: string;
}
/** Client → broker: list skills, optionally filtered by keyword. */
export interface WSListSkillsMessage {
type: "list_skills";
query?: string;
_reqId?: string;
}
/** Client → broker: remove a skill by name. */
export interface WSRemoveSkillMessage {
type: "remove_skill";
name: string;
_reqId?: string;
}
/** Broker → client: acknowledgement for share_skill or remove_skill. */
export interface WSSkillAckMessage {
type: "skill_ack";
name: string;
action: "shared" | "removed" | "not_found";
_reqId?: string;
}
/** Broker → client: response to get_skill with full skill data. */
export interface WSSkillDataMessage {
type: "skill_data";
skill: {
name: string;
description: string;
instructions: string;
tags: string[];
author: string;
createdAt: string;
} | null;
_reqId?: string;
}
/** Broker → client: response to list_skills. */
export interface WSSkillListMessage {
type: "skill_list";
skills: Array<{
name: string;
description: string;
tags: string[];
author: string;
createdAt: string;
}>;
_reqId?: string;
}
export type WSServerMessage =
| WSHelloAckMessage
@@ -827,4 +1114,14 @@ export type WSServerMessage =
| WSScheduledAckMessage
| WSScheduledListMessage
| WSCancelScheduledAckMessage
| WSMcpRegisterAckMessage
| WSMcpListResultMessage
| WSMcpCallResultMessage
| WSMcpCallForwardMessage
| WSClockStatusMessage
| WSSkillAckMessage
| WSSkillDataMessage
| WSSkillListMessage
| WSWebhookAckMessage
| WSWebhookListMessage
| WSErrorMessage;