diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 86c8d58..c518bca 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -39,8 +39,10 @@ import { meshMember as memberTable, meshMemory, meshState, + meshService, meshSkill, meshStream, + meshVaultEntry, meshTask, messageQueue, pendingStatus, @@ -1951,3 +1953,91 @@ export async function meshSchema( } return [...tables.entries()].map(([name, columns]) => ({ name, columns })); } + +// --------------------------------------------------------------------------- +// Vault operations +// --------------------------------------------------------------------------- + +export async function vaultSet(meshId: string, memberId: string, key: string, ciphertext: string, nonce: string, sealedKey: string, entryType: "env" | "file", mountPath?: string, description?: string): Promise { + const existing = await db.select({ id: meshVaultEntry.id }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), eq(meshVaultEntry.key, key))).limit(1); + if (existing.length > 0) { + await db.update(meshVaultEntry).set({ ciphertext, nonce, sealedKey, entryType, mountPath: mountPath ?? null, description: description ?? null, updatedAt: new Date() }).where(eq(meshVaultEntry.id, existing[0]!.id)); + return existing[0]!.id; + } + const [row] = await db.insert(meshVaultEntry).values({ meshId, memberId, key, ciphertext, nonce, sealedKey, entryType, mountPath: mountPath ?? null, description: description ?? null }).returning({ id: meshVaultEntry.id }); + return row!.id; +} + +export async function vaultList(meshId: string, memberId: string) { + return db.select({ key: meshVaultEntry.key, entryType: meshVaultEntry.entryType, mountPath: meshVaultEntry.mountPath, description: meshVaultEntry.description, updatedAt: meshVaultEntry.updatedAt }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId))); +} + +export async function vaultDelete(meshId: string, memberId: string, key: string): Promise { + const deleted = await db.delete(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), eq(meshVaultEntry.key, key))).returning({ id: meshVaultEntry.id }); + return deleted.length > 0; +} + +export async function vaultGetEntries(meshId: string, memberId: string, keys: string[]) { + if (keys.length === 0) return []; + return db.select({ key: meshVaultEntry.key, ciphertext: meshVaultEntry.ciphertext, nonce: meshVaultEntry.nonce, sealedKey: meshVaultEntry.sealedKey, entryType: meshVaultEntry.entryType, mountPath: meshVaultEntry.mountPath }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), inArray(meshVaultEntry.key, keys))); +} + +// --------------------------------------------------------------------------- +// Service catalog operations +// --------------------------------------------------------------------------- + +export async function upsertService(meshId: string, name: string, data: { type: "mcp" | "skill"; sourceType: string; description: string; sourceFileId?: string; sourceGitUrl?: string; sourceGitBranch?: string; sourceGitSha?: string; instructions?: string; toolsSchema?: unknown; manifest?: unknown; runtime?: string; status?: string; config?: unknown; scope?: unknown; deployedBy?: string; deployedByName?: string }): Promise { + // Whitelist allowed fields — prevent mass-assignment of id, meshId, createdAt, etc. + const fields: Record = { + type: data.type, + sourceType: data.sourceType, + description: data.description, + ...(data.sourceFileId !== undefined && { sourceFileId: data.sourceFileId }), + ...(data.sourceGitUrl !== undefined && { sourceGitUrl: data.sourceGitUrl }), + ...(data.sourceGitBranch !== undefined && { sourceGitBranch: data.sourceGitBranch }), + ...(data.sourceGitSha !== undefined && { sourceGitSha: data.sourceGitSha }), + ...(data.instructions !== undefined && { instructions: data.instructions }), + ...(data.toolsSchema !== undefined && { toolsSchema: data.toolsSchema }), + ...(data.manifest !== undefined && { manifest: data.manifest }), + ...(data.runtime !== undefined && { runtime: data.runtime }), + ...(data.status !== undefined && { status: data.status }), + ...(data.config !== undefined && { config: data.config }), + ...(data.scope !== undefined && { scope: data.scope }), + ...(data.deployedBy !== undefined && { deployedBy: data.deployedBy }), + ...(data.deployedByName !== undefined && { deployedByName: data.deployedByName }), + }; + + const existing = await db.select({ id: meshService.id }).from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).limit(1); + if (existing.length > 0) { + await db.update(meshService).set({ ...fields, updatedAt: new Date() } as any).where(eq(meshService.id, existing[0]!.id)); + return existing[0]!.id; + } + const [row] = await db.insert(meshService).values({ meshId, name, ...fields } as any).returning({ id: meshService.id }); + return row!.id; +} + +export async function updateServiceStatus(meshId: string, name: string, status: string, extra?: { toolsSchema?: unknown; restartCount?: number; lastHealth?: Date }) { + await db.update(meshService).set({ status, ...(extra ?? {}), updatedAt: new Date() } as any).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))); +} + +export async function updateServiceScope(meshId: string, name: string, scope: unknown) { + await db.update(meshService).set({ scope, updatedAt: new Date() } as any).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))); +} + +export async function getService(meshId: string, name: string) { + const rows = await db.select().from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).limit(1); + return rows[0] ?? null; +} + +export async function listDbMeshServices(meshId: string) { + return db.select().from(meshService).where(eq(meshService.meshId, meshId)); +} + +export async function deleteService(meshId: string, name: string): Promise { + const deleted = await db.delete(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).returning({ id: meshService.id }); + return deleted.length > 0; +} + +export async function getRunningServices(meshId: string) { + return db.select().from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.status, "running"))); +} diff --git a/apps/broker/src/env.ts b/apps/broker/src/env.ts index 5fc8771..bd5bb43 100644 --- a/apps/broker/src/env.ts +++ b/apps/broker/src/env.ts @@ -28,6 +28,9 @@ const envSchema = z.object({ NEO4J_URL: z.string().default("bolt://neo4j:7687"), NEO4J_USER: z.string().default("neo4j"), NEO4J_PASSWORD: z.string().default("changeme"), + CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"), + MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20), + MAX_SERVICE_ZIP_BYTES: z.coerce.number().int().positive().default(50 * 1024 * 1024), NODE_ENV: z .enum(["development", "production", "test"]) .default("development"), diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index d04f939..fe7caed 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -69,7 +69,17 @@ import { getSkill, listSkills, removeSkill, + vaultSet, + vaultList, + vaultDelete, + upsertService, + updateServiceStatus, + updateServiceScope, + getService, + listDbMeshServices, + deleteService, } from "./broker"; +import * as serviceManager from "./service-manager"; import { ensureBucket, meshBucketName, minioClient } from "./minio"; import { qdrant, meshCollectionName, ensureCollection } from "./qdrant"; import { neo4jDriver, meshDbName, ensureDatabase } from "./neo4j-client"; @@ -1210,6 +1220,36 @@ function handleConnection(ws: WebSocket): void { if (result.restoredGroups) ackPayload.restoredGroups = result.restoredGroups; if (result.restoredStats) ackPayload.restoredStats = result.restoredStats; } + // Attach scope-filtered service catalog + try { + const helloConn = connections.get(presenceId); + if (helloConn) { + const allSvcs = await listDbMeshServices(helloConn.meshId); + const myGroups = helloConn.groups ?? []; + ackPayload.services = allSvcs + .filter(svc => { + if (svc.status !== "running") return false; + const scope = svc.scope as any; + if (!scope) return false; + const t = typeof scope === "string" ? scope : scope.type; + if (t === "mesh") return true; + if (t === "peer") return svc.deployedBy === helloConn.memberId; + if (scope.peers) return scope.peers.includes(helloConn.displayName) || scope.peers.includes(helloConn.memberId); + if (scope.group) return myGroups.some((g: any) => g.name === scope.group); + if (scope.groups) return myGroups.some((g: any) => scope.groups.includes(g.name)); + if (scope.role) return myGroups.some((g: any) => g.role === scope.role); + return false; + }) + .map(s => ({ + name: s.name, + description: s.description, + status: s.status ?? "stopped", + tools: (s.toolsSchema as any[]) ?? [], + deployed_by: s.deployedByName ?? "unknown", + })); + } + } catch { /* non-fatal */ } + ws.send(JSON.stringify(ackPayload)); } catch { /* ws closed during hello */ @@ -3087,6 +3127,146 @@ function handleConnection(ws: WebSocket): void { log.info("ws delete_webhook", { presence_id: presenceId, name: dw.name }); break; } + + // --- Vault --- + case "vault_set": { + const vs = msg as any; + try { + await vaultSet(conn.meshId, conn.memberId, vs.key, vs.ciphertext, vs.nonce, vs.sealed_key, vs.entry_type, vs.mount_path, vs.description); + sendToPeer(presenceId, { type: "vault_ack", key: vs.key, action: "stored", _reqId: vs._reqId } as any); + } catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, vs._reqId); } + break; + } + case "vault_list": { + try { + const entries = await vaultList(conn.meshId, conn.memberId); + sendToPeer(presenceId, { type: "vault_list_result", entries: entries.map((e: any) => ({ key: e.key, entry_type: e.entryType, mount_path: e.mountPath, description: e.description, updated_at: e.updatedAt?.toISOString() })), _reqId: (msg as any)._reqId } as any); + } catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, (msg as any)._reqId); } + break; + } + case "vault_delete": { + const vd = msg as any; + try { + const ok = await vaultDelete(conn.meshId, conn.memberId, vd.key); + sendToPeer(presenceId, { type: "vault_ack", key: vd.key, action: ok ? "deleted" : "not_found", _reqId: vd._reqId } as any); + } catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, vd._reqId); } + break; + } + + // --- MCP Deploy/Undeploy --- + case "mcp_deploy": { + const md = msg as any; + try { + // Validate service name (path traversal protection) + const nameError = serviceManager.validateServiceName(md.server_name ?? ""); + if (nameError) { + sendError(ws, "invalid_name", nameError, undefined, md._reqId); + break; + } + const existing = await listDbMeshServices(conn.meshId); + if (existing.length >= env.MAX_SERVICES_PER_MESH) { + sendError(ws, "limit", `max ${env.MAX_SERVICES_PER_MESH} services per mesh`, undefined, md._reqId); + break; + } + await upsertService(conn.meshId, md.server_name, { + type: "mcp", sourceType: md.source.type, description: `MCP server: ${md.server_name}`, + sourceFileId: md.source.type === "zip" ? md.source.file_id : undefined, + sourceGitUrl: md.source.type === "git" ? md.source.url : undefined, + sourceGitBranch: md.source.type === "git" ? md.source.branch : undefined, + runtime: md.config?.runtime, status: "building", config: md.config ?? {}, + scope: md.scope ?? "peer", deployedBy: conn.memberId, deployedByName: conn.displayName, + }); + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "building", _reqId: md._reqId } as any); + broadcastToMesh(conn.meshId, { + type: "push", subtype: "system" as const, event: "mcp_deployed", + eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: 0, deployed_by: conn.displayName, scope: md.scope ?? "peer" }, + messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system", + priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(), + }); + log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name }); + } catch (e) { sendError(ws, "deploy_error", e instanceof Error ? e.message : String(e), undefined, md._reqId); } + break; + } + case "mcp_undeploy": { + const mu = msg as any; + try { + await serviceManager.undeploy(conn.meshId, mu.server_name); + await deleteService(conn.meshId, mu.server_name); + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: mu.server_name, status: "stopped", _reqId: mu._reqId } as any); + broadcastToMesh(conn.meshId, { + type: "push", subtype: "system" as const, event: "mcp_undeployed", + eventData: { name: mu.server_name, by: conn.displayName }, + messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system", + priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(), + }); + log.info("ws mcp_undeploy", { presence_id: presenceId, name: mu.server_name }); + } catch (e) { sendError(ws, "undeploy_error", e instanceof Error ? e.message : String(e), undefined, mu._reqId); } + break; + } + case "mcp_update": { + const mup = msg as any; + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: mup.server_name, status: "building", _reqId: mup._reqId } as any); + log.info("ws mcp_update", { presence_id: presenceId, name: mup.server_name }); + break; + } + case "mcp_logs": { + const ml = msg as any; + const lines = serviceManager.getLogs(conn.meshId, ml.server_name, ml.lines); + sendToPeer(presenceId, { type: "mcp_logs_result", server_name: ml.server_name, lines, _reqId: ml._reqId } as any); + break; + } + case "mcp_scope": { + const ms = msg as any; + try { + if (ms.scope !== undefined) { + await updateServiceScope(conn.meshId, ms.server_name, ms.scope); + broadcastToMesh(conn.meshId, { + type: "push", subtype: "system" as const, event: "mcp_scope_changed", + eventData: { name: ms.server_name, scope: ms.scope, by: conn.displayName }, + messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system", + priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(), + }); + } + const svc = await getService(conn.meshId, ms.server_name); + sendToPeer(presenceId, { type: "mcp_scope_result", server_name: ms.server_name, scope: svc?.scope ?? { type: "peer" }, deployed_by: svc?.deployedByName ?? "unknown", _reqId: ms._reqId } as any); + } catch (e) { sendError(ws, "scope_error", e instanceof Error ? e.message : String(e), undefined, ms._reqId); } + break; + } + case "mcp_schema": { + const msch = msg as any; + try { + let tools = serviceManager.getTools(conn.meshId, msch.server_name); + if (tools.length === 0) { + const svc = await getService(conn.meshId, msch.server_name); + tools = (svc?.toolsSchema as any[]) ?? []; + } + if (msch.tool_name) tools = tools.filter((t: any) => t.name === msch.tool_name); + sendToPeer(presenceId, { type: "mcp_schema_result", server_name: msch.server_name, tools, _reqId: msch._reqId } as any); + } catch (e) { sendError(ws, "schema_error", e instanceof Error ? e.message : String(e), undefined, msch._reqId); } + break; + } + case "mcp_catalog": { + try { + const allSvcs = await listDbMeshServices(conn.meshId); + sendToPeer(presenceId, { + type: "mcp_catalog_result", + services: allSvcs.map((s: any) => ({ + name: s.name, type: s.type, description: s.description, status: s.status ?? "stopped", + tool_count: Array.isArray(s.toolsSchema) ? s.toolsSchema.length : 0, + deployed_by: s.deployedByName ?? "unknown", scope: s.scope ?? { type: "peer" }, + source_type: s.sourceType, runtime: s.runtime, created_at: s.createdAt.toISOString(), + })), + _reqId: (msg as any)._reqId, + } as any); + } catch (e) { sendError(ws, "catalog_error", e instanceof Error ? e.message : String(e), undefined, (msg as any)._reqId); } + break; + } + case "skill_deploy": { + const sd = msg as any; + sendToPeer(presenceId, { type: "skill_deploy_ack", name: "TODO", files: [], _reqId: sd._reqId } as any); + log.info("ws skill_deploy", { presence_id: presenceId, source: sd.source?.type }); + break; + } } } catch (e) { metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); @@ -3372,6 +3552,7 @@ function main(): void { startSweepers(); startDbHealth(); + serviceManager.startHealthChecks(); // Ensure audit log table exists and load hash chain state ensureAuditLogTable() @@ -3418,6 +3599,7 @@ function main(): void { clearInterval(rlSweep); clearInterval(queueDepthTimer); stopDbHealth(); + await serviceManager.shutdownAll(); await stopSweepers(); for (const { ws } of connections.values()) { try { diff --git a/apps/broker/src/service-manager.ts b/apps/broker/src/service-manager.ts new file mode 100644 index 0000000..b264553 --- /dev/null +++ b/apps/broker/src/service-manager.ts @@ -0,0 +1,788 @@ +/** + * Service Manager — lifecycle management for mesh-deployed MCP servers. + * + * Each deployed MCP server runs as a child process with its own stdio pipe. + * The manager spawns, monitors, restarts, and routes tool calls to them. + * + * In production: child processes run inside a Docker container (one per mesh). + * In dev: child processes run directly on the broker host. + */ + +import { spawn, type ChildProcess } from "node:child_process"; +import { createInterface } from "node:readline"; +import { existsSync } from "node:fs"; +import { readFileSync } from "node:fs"; +import { join } from "node:path"; +import { log } from "./logger"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** MCP tool definition returned by tools/list. */ +export interface ToolDef { + name: string; + description: string; + inputSchema: Record; +} + +/** Per-service deploy-time configuration. */ +export interface ServiceConfig { + env?: Record; + memory_mb?: number; + cpus?: number; + network_allow?: string[]; + runtime?: "node" | "python" | "bun"; +} + +/** Observable lifecycle states. */ +export type ServiceStatus = + | "building" + | "installing" + | "running" + | "stopped" + | "failed" + | "crashed" + | "restarting"; + +/** Internal bookkeeping for a spawned service. */ +interface ManagedService { + name: string; + meshId: string; + process: ChildProcess | null; + tools: ToolDef[]; + status: ServiceStatus; + config: ServiceConfig; + sourcePath: string; + runtime: "node" | "python" | "bun"; + restartCount: number; + maxRestarts: number; + healthFailures: number; + logBuffer: string[]; // ring buffer, max LOG_BUFFER_SIZE + pendingCalls: Map< + string, + { + resolve: (result: { result?: unknown; error?: string }) => void; + timer: NodeJS.Timeout; + } + >; + pid?: number; + startedAt?: Date; +} + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +const LOG_BUFFER_SIZE = 1000; +const HEALTH_INTERVAL_MS = 30_000; +const HEALTH_TIMEOUT_MS = 5_000; +const MAX_HEALTH_FAILURES = 3; +const DEFAULT_MAX_RESTARTS = 5; +const CALL_TIMEOUT_MS = 25_000; +const SERVICES_BASE_DIR = + process.env.CLAUDEMESH_SERVICES_DIR ?? "/var/claudemesh/services"; + +// --------------------------------------------------------------------------- +// Service registry +// --------------------------------------------------------------------------- + +const services = new Map(); // keyed by "meshId:serviceName" +let healthTimer: NodeJS.Timer | null = null; + +function serviceKey(meshId: string, name: string): string { + return `${meshId}:${name}`; +} + +/** Validate service name: alphanumeric, hyphens, underscores only. No path traversal. */ +const SAFE_NAME_RE = /^[a-zA-Z0-9][a-zA-Z0-9_-]{0,63}$/; + +export function validateServiceName(name: string): string | null { + if (!SAFE_NAME_RE.test(name)) { + return "service name must be 1-64 chars, alphanumeric/hyphens/underscores, starting with alphanumeric"; + } + if (name.includes("..") || name.includes("/") || name.includes("\\")) { + return "service name must not contain path separators"; + } + return null; // valid +} + +// --------------------------------------------------------------------------- +// Runtime detection +// --------------------------------------------------------------------------- + +/** + * Detect the runtime for a service based on its source directory contents. + * + * Priority: bun (lockfile/config) > node (package.json) > python + * (pyproject.toml / requirements.txt). Falls back to node. + */ +export function detectRuntime(sourcePath: string): "node" | "python" | "bun" { + if ( + existsSync(join(sourcePath, "bun.lockb")) || + existsSync(join(sourcePath, "bunfig.toml")) + ) { + return "bun"; + } + if (existsSync(join(sourcePath, "package.json"))) { + return "node"; + } + if ( + existsSync(join(sourcePath, "pyproject.toml")) || + existsSync(join(sourcePath, "requirements.txt")) + ) { + return "python"; + } + return "node"; // default +} + +// --------------------------------------------------------------------------- +// Entry point detection +// --------------------------------------------------------------------------- + +function detectEntry( + sourcePath: string, + runtime: "node" | "python" | "bun", +): { command: string; args: string[] } { + if (runtime === "python") { + if (existsSync(join(sourcePath, "requirements.txt"))) { + for (const entry of [ + "server.py", + "src/server.py", + "main.py", + "src/main.py", + ]) { + if (existsSync(join(sourcePath, entry))) { + return { command: "python", args: [entry] }; + } + } + } + if (existsSync(join(sourcePath, "pyproject.toml"))) { + return { command: "python", args: ["-m", "server"] }; + } + return { command: "python", args: ["server.py"] }; + } + + // Node / Bun + const cmd = runtime === "bun" ? "bun" : "node"; + if (existsSync(join(sourcePath, "package.json"))) { + try { + const pkg = JSON.parse( + readFileSync(join(sourcePath, "package.json"), "utf-8"), + ); + if (pkg.main) return { command: cmd, args: [pkg.main] }; + if (pkg.bin) { + const bin = + typeof pkg.bin === "string" + ? pkg.bin + : (Object.values(pkg.bin)[0] as string); + if (bin) return { command: cmd, args: [bin] }; + } + } catch { + /* ignore parse errors */ + } + } + + // Common entry points + for (const entry of [ + "dist/index.js", + "src/index.js", + "src/index.ts", + "index.js", + ]) { + if (existsSync(join(sourcePath, entry))) { + return { command: cmd, args: [entry] }; + } + } + + return { command: cmd, args: ["src/index.js"] }; +} + +// --------------------------------------------------------------------------- +// Install dependencies +// --------------------------------------------------------------------------- + +/** + * Install dependencies for a service. Resolves on success, rejects with + * the tail of stderr on failure. + */ +export async function installDeps( + sourcePath: string, + runtime: "node" | "python" | "bun", +): Promise { + return new Promise((resolve, reject) => { + let cmd: string; + let args: string[]; + + if (runtime === "python") { + if (existsSync(join(sourcePath, "requirements.txt"))) { + cmd = "pip"; + args = ["install", "--no-cache-dir", "-r", "requirements.txt"]; + } else { + cmd = "pip"; + args = ["install", "--no-cache-dir", "."]; + } + } else if (runtime === "bun") { + cmd = "bun"; + args = ["install"]; + } else { + cmd = "npm"; + args = ["install", "--production", "--legacy-peer-deps"]; + } + + const child = spawn(cmd, args, { + cwd: sourcePath, + stdio: ["ignore", "pipe", "pipe"], + }); + + let stderr = ""; + child.stderr?.on("data", (d: Buffer) => { + stderr += d.toString(); + }); + child.on("exit", (code) => { + if (code === 0) resolve(); + else + reject( + new Error( + `${cmd} install failed (exit ${code}): ${stderr.slice(-500)}`, + ), + ); + }); + child.on("error", reject); + }); +} + +// --------------------------------------------------------------------------- +// Log ring buffer +// --------------------------------------------------------------------------- + +function appendLog(svc: ManagedService, line: string): void { + svc.logBuffer.push(`${new Date().toISOString()} ${line}`); + if (svc.logBuffer.length > LOG_BUFFER_SIZE) { + svc.logBuffer.shift(); + } +} + +// --------------------------------------------------------------------------- +// MCP JSON-RPC helpers +// --------------------------------------------------------------------------- + +let callIdCounter = 0; + +function sendMcpRequest( + svc: ManagedService, + method: string, + params?: unknown, +): Promise<{ result?: unknown; error?: string }> { + return new Promise((resolve) => { + if (!svc.process || !svc.process.stdin?.writable) { + resolve({ error: "service not running" }); + return; + } + + const id = `call_${++callIdCounter}`; + const request = { + jsonrpc: "2.0", + id, + method, + ...(params ? { params } : {}), + }; + + const timer = setTimeout(() => { + svc.pendingCalls.delete(id); + resolve({ error: `tool call timed out after ${CALL_TIMEOUT_MS}ms` }); + }, CALL_TIMEOUT_MS); + + svc.pendingCalls.set(id, { resolve, timer }); + + try { + svc.process.stdin!.write(JSON.stringify(request) + "\n"); + } catch (e) { + clearTimeout(timer); + svc.pendingCalls.delete(id); + resolve({ + error: `write failed: ${e instanceof Error ? e.message : String(e)}`, + }); + } + }); +} + +// --------------------------------------------------------------------------- +// Initialize MCP server (handshake + tool discovery) +// --------------------------------------------------------------------------- + +async function initializeMcp(svc: ManagedService): Promise { + // MCP initialize handshake + const initResult = await sendMcpRequest(svc, "initialize", { + protocolVersion: "2024-11-05", + capabilities: {}, + clientInfo: { name: "claudemesh-runner", version: "0.1.0" }, + }); + + if (initResult.error) { + throw new Error(`MCP initialize failed: ${initResult.error}`); + } + + // Send initialized notification (no response expected) + if (svc.process?.stdin?.writable) { + svc.process.stdin.write( + JSON.stringify({ + jsonrpc: "2.0", + method: "notifications/initialized", + }) + "\n", + ); + } + + // Fetch tool list + const toolsResult = await sendMcpRequest(svc, "tools/list", {}); + if (toolsResult.error) { + throw new Error(`tools/list failed: ${toolsResult.error}`); + } + + const result = toolsResult.result as { tools?: ToolDef[] } | undefined; + return result?.tools ?? []; +} + +// --------------------------------------------------------------------------- +// Spawn an MCP server child process +// --------------------------------------------------------------------------- + +function spawnService(svc: ManagedService): void { + const { command, args } = detectEntry(svc.sourcePath, svc.runtime); + + const env: Record = { + ...(process.env as Record), + ...(svc.config.env ?? {}), + NODE_ENV: "production", + }; + + const child = spawn(command, args, { + cwd: svc.sourcePath, + stdio: ["pipe", "pipe", "pipe"], + env, + }); + + svc.process = child; + svc.pid = child.pid; + svc.startedAt = new Date(); + svc.status = "running"; + svc.healthFailures = 0; + + // Read MCP JSON-RPC responses from stdout + const rl = createInterface({ input: child.stdout! }); + rl.on("line", (line) => { + try { + const msg = JSON.parse(line); + if (msg.id && svc.pendingCalls.has(String(msg.id))) { + const pending = svc.pendingCalls.get(String(msg.id))!; + clearTimeout(pending.timer); + svc.pendingCalls.delete(String(msg.id)); + if (msg.error) { + pending.resolve({ + error: msg.error.message ?? JSON.stringify(msg.error), + }); + } else { + pending.resolve({ result: msg.result }); + } + } + } catch { + // Not JSON — treat as log output + appendLog(svc, `[stdout] ${line}`); + } + }); + + // Capture stderr as logs + const stderrRl = createInterface({ input: child.stderr! }); + stderrRl.on("line", (line) => { + appendLog(svc, `[stderr] ${line}`); + }); + + child.on("exit", (code, signal) => { + log.warn("service exited", { + service: svc.name, + mesh_id: svc.meshId, + code, + signal, + restarts: svc.restartCount, + }); + + // Reject all pending calls + for (const [, pending] of svc.pendingCalls) { + clearTimeout(pending.timer); + pending.resolve({ error: "service crashed" }); + } + svc.pendingCalls.clear(); + svc.process = null; + svc.pid = undefined; + + // Auto-restart if under limit + if (svc.status === "running" && svc.restartCount < svc.maxRestarts) { + svc.restartCount++; + svc.status = "restarting"; + log.info("auto-restarting service", { + service: svc.name, + attempt: svc.restartCount, + }); + setTimeout(() => spawnService(svc), 1000 * svc.restartCount); // backoff + } else if (svc.status === "running") { + svc.status = "crashed"; + log.error("service max restarts exceeded", { + service: svc.name, + restarts: svc.restartCount, + }); + } + }); + + child.on("error", (err) => { + log.error("service spawn error", { + service: svc.name, + error: err.message, + }); + svc.status = "failed"; + }); + + log.info("service spawned", { + service: svc.name, + mesh_id: svc.meshId, + pid: child.pid, + command, + args, + runtime: svc.runtime, + }); +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** + * Deploy (or redeploy) an MCP server. + * + * Installs dependencies, spawns the child process, runs the MCP + * initialize handshake, and returns the discovered tool list. + */ +export async function deploy(opts: { + meshId: string; + name: string; + sourcePath: string; + config: ServiceConfig; + resolvedEnv?: Record; +}): Promise<{ tools: ToolDef[]; status: ServiceStatus }> { + const key = serviceKey(opts.meshId, opts.name); + + // Kill existing if redeploying + const existing = services.get(key); + if (existing?.process) { + existing.process.kill("SIGTERM"); + await new Promise((r) => setTimeout(r, 1000)); + } + + const runtime = opts.config.runtime ?? detectRuntime(opts.sourcePath); + + const svc: ManagedService = { + name: opts.name, + meshId: opts.meshId, + process: null, + tools: [], + status: "installing", + config: { + ...opts.config, + env: { ...(opts.config.env ?? {}), ...(opts.resolvedEnv ?? {}) }, + }, + sourcePath: opts.sourcePath, + runtime, + restartCount: 0, + maxRestarts: DEFAULT_MAX_RESTARTS, + healthFailures: 0, + logBuffer: [], + pendingCalls: new Map(), + }; + + services.set(key, svc); + + // Install dependencies + try { + await installDeps(opts.sourcePath, runtime); + } catch (e) { + svc.status = "failed"; + appendLog( + svc, + `Install failed: ${e instanceof Error ? e.message : String(e)}`, + ); + throw e; + } + + // Spawn and initialize + spawnService(svc); + + // Wait a moment for the process to start + await new Promise((r) => setTimeout(r, 500)); + + // Get tool list via MCP initialize handshake + try { + svc.tools = await initializeMcp(svc); + log.info("service deployed", { + service: opts.name, + mesh_id: opts.meshId, + tools: svc.tools.length, + runtime, + }); + } catch (e) { + svc.status = "failed"; + appendLog( + svc, + `MCP init failed: ${e instanceof Error ? e.message : String(e)}`, + ); + throw e; + } + + return { tools: svc.tools, status: svc.status }; +} + +/** + * Undeploy a running service. Sends SIGTERM, waits for graceful exit + * (up to 10 s), then SIGKILL. All pending tool calls are rejected. + */ +export async function undeploy(meshId: string, name: string): Promise { + const key = serviceKey(meshId, name); + const svc = services.get(key); + if (!svc) return; + + svc.status = "stopped"; + if (svc.process) { + svc.process.kill("SIGTERM"); + await new Promise((resolve) => { + const timeout = setTimeout(() => { + svc.process?.kill("SIGKILL"); + resolve(); + }, 10_000); + svc.process?.on("exit", () => { + clearTimeout(timeout); + resolve(); + }); + }); + } + + // Reject pending calls + for (const [, pending] of svc.pendingCalls) { + clearTimeout(pending.timer); + pending.resolve({ error: "service undeployed" }); + } + + services.delete(key); + log.info("service undeployed", { service: name, mesh_id: meshId }); +} + +/** + * Route a tool call to the named service. Returns the MCP response + * payload or an error string. + */ +export async function callTool( + meshId: string, + serverName: string, + toolName: string, + args: Record, +): Promise<{ result?: unknown; error?: string }> { + const key = serviceKey(meshId, serverName); + const svc = services.get(key); + if (!svc) return { error: `service "${serverName}" not found` }; + if (svc.status !== "running") + return { error: `service "${serverName}" is ${svc.status}` }; + if (!svc.process) + return { error: `service "${serverName}" has no running process` }; + + return sendMcpRequest(svc, "tools/call", { name: toolName, arguments: args }); +} + +/** + * Return the last N log lines for a service (from its ring buffer). + */ +export function getLogs(meshId: string, name: string, lines = 50): string[] { + const key = serviceKey(meshId, name); + const svc = services.get(key); + if (!svc) return []; + return svc.logBuffer.slice(-Math.min(lines, LOG_BUFFER_SIZE)); +} + +/** + * Return current status, PID, restart count, tool list, and uptime + * for a single service. Returns null if the service doesn't exist. + */ +export function getStatus( + meshId: string, + name: string, +): { + status: ServiceStatus; + pid?: number; + restartCount: number; + tools: ToolDef[]; + startedAt?: string; +} | null { + const key = serviceKey(meshId, name); + const svc = services.get(key); + if (!svc) return null; + return { + status: svc.status, + pid: svc.pid, + restartCount: svc.restartCount, + tools: svc.tools, + startedAt: svc.startedAt?.toISOString(), + }; +} + +/** + * Return the tool definitions for a service, or an empty array if the + * service doesn't exist. + */ +export function getTools(meshId: string, name: string): ToolDef[] { + const key = serviceKey(meshId, name); + const svc = services.get(key); + return svc?.tools ?? []; +} + +/** + * List all services belonging to a mesh with summary info. + */ +export function listServices( + meshId: string, +): Array<{ + name: string; + status: ServiceStatus; + toolCount: number; + runtime: string; + restartCount: number; + pid?: number; +}> { + const result: Array<{ + name: string; + status: ServiceStatus; + toolCount: number; + runtime: string; + restartCount: number; + pid?: number; + }> = []; + for (const [key, svc] of services) { + if (!key.startsWith(`${meshId}:`)) continue; + result.push({ + name: svc.name, + status: svc.status, + toolCount: svc.tools.length, + runtime: svc.runtime, + restartCount: svc.restartCount, + pid: svc.pid, + }); + } + return result; +} + +// --------------------------------------------------------------------------- +// Health check loop +// --------------------------------------------------------------------------- + +async function healthCheckAll(): Promise { + for (const [, svc] of services) { + if (svc.status !== "running" || !svc.process) continue; + + const result = await sendMcpRequest(svc, "ping", {}); + if (result.error) { + svc.healthFailures++; + log.warn("health check failed", { + service: svc.name, + failures: svc.healthFailures, + error: result.error, + }); + if (svc.healthFailures >= MAX_HEALTH_FAILURES) { + log.error("health check threshold exceeded, restarting", { + service: svc.name, + }); + svc.process.kill("SIGTERM"); + // exit handler will trigger auto-restart + } + } else { + svc.healthFailures = 0; + } + } +} + +/** Start the periodic health check loop (30 s interval). No-op if already running. */ +export function startHealthChecks(): void { + if (healthTimer) return; + healthTimer = setInterval(healthCheckAll, HEALTH_INTERVAL_MS); +} + +/** Stop the periodic health check loop. */ +export function stopHealthChecks(): void { + if (healthTimer) { + clearInterval(healthTimer); + healthTimer = null; + } +} + +// --------------------------------------------------------------------------- +// Restore all services on broker boot +// --------------------------------------------------------------------------- + +/** + * Re-deploy every persisted service record. Called once at broker startup + * to bring services back after a restart. Failures are logged but don't + * prevent other services from restoring. + */ +export async function restoreAll( + getServiceRecords: () => Promise< + Array<{ + meshId: string; + name: string; + sourcePath: string; + config: ServiceConfig; + resolvedEnv?: Record; + }> + >, +): Promise { + const records = await getServiceRecords(); + log.info("restoring services", { count: records.length }); + + for (const record of records) { + try { + await deploy({ + meshId: record.meshId, + name: record.name, + sourcePath: record.sourcePath, + config: record.config, + resolvedEnv: record.resolvedEnv, + }); + log.info("service restored", { + service: record.name, + mesh_id: record.meshId, + }); + } catch (e) { + log.error("service restore failed", { + service: record.name, + mesh_id: record.meshId, + error: e instanceof Error ? e.message : String(e), + }); + } + } + + startHealthChecks(); +} + +// --------------------------------------------------------------------------- +// Shutdown +// --------------------------------------------------------------------------- + +/** + * Gracefully shut down all running services. Stops health checks, sends + * SIGTERM to every child, waits for exit, then clears the registry. + */ +export async function shutdownAll(): Promise { + stopHealthChecks(); + const promises: Promise[] = []; + for (const [, svc] of services) { + if (svc.process) { + svc.status = "stopped"; + promises.push(undeploy(svc.meshId, svc.name)); + } + } + await Promise.allSettled(promises); + services.clear(); +} diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 9f2c6a5..7bf6a1c 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -224,6 +224,7 @@ export interface WSHelloAckMessage { restoredGroups?: Array<{ name: string; role?: string }>; /** Restored cumulative stats (only when restored). */ restoredStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number }; + services?: Array<{ name: string; description: string; status: string; tools: Array<{ name: string; description: string; inputSchema: object }>; deployed_by: string }>; } /** Broker → client: list of connected peers in the same mesh. */ @@ -1078,6 +1079,29 @@ export interface WSCancelScheduledAckMessage { _reqId?: string; } +/** Client → broker: deploy an MCP server from zip or git. */ +export interface WSMcpDeployMessage { type: "mcp_deploy"; server_name: string; source: { type: "zip"; file_id: string } | { type: "git"; url: string; branch?: string; auth?: string }; config?: { env?: Record; memory_mb?: number; cpus?: number; network_allow?: string[]; runtime?: "node" | "python" | "bun" }; scope?: "peer" | "mesh" | { peers: string[] } | { group: string } | { groups: string[] } | { role: string }; _reqId?: string; } +/** Client → broker: stop and remove a managed MCP server. */ +export interface WSMcpUndeployMessage { type: "mcp_undeploy"; server_name: string; _reqId?: string; } +/** Client → broker: pull + rebuild + restart a git-sourced MCP. */ +export interface WSMcpUpdateMessage { type: "mcp_update"; server_name: string; _reqId?: string; } +/** Client → broker: get logs from a managed MCP. */ +export interface WSMcpLogsMessage { type: "mcp_logs"; server_name: string; lines?: number; _reqId?: string; } +/** Client → broker: get or set visibility scope. */ +export interface WSMcpScopeMessage { type: "mcp_scope"; server_name: string; scope?: "peer" | "mesh" | { peers: string[] } | { group: string } | { groups: string[] } | { role: string }; _reqId?: string; } +/** Client → broker: inspect tool schemas for a deployed service. */ +export interface WSMcpSchemaMessage { type: "mcp_schema"; server_name: string; tool_name?: string; _reqId?: string; } +/** Client → broker: list all deployed services. */ +export interface WSMcpCatalogMessage { type: "mcp_catalog"; _reqId?: string; } +/** Client → broker: deploy a skill bundle from zip or git. */ +export interface WSSkillDeployMessage { type: "skill_deploy"; source: { type: "zip"; file_id: string } | { type: "git"; url: string; branch?: string; auth?: string }; _reqId?: string; } +/** Client → broker: store encrypted credential. */ +export interface WSVaultSetMessage { type: "vault_set"; key: string; ciphertext: string; nonce: string; sealed_key: string; entry_type: "env" | "file"; mount_path?: string; description?: string; _reqId?: string; } +/** Client → broker: list vault entries. */ +export interface WSVaultListMessage { type: "vault_list"; _reqId?: string; } +/** Client → broker: delete vault entry. */ +export interface WSVaultDeleteMessage { type: "vault_delete"; key: string; _reqId?: string; } + export type WSClientMessage = | WSHelloMessage | WSSendMessage @@ -1147,7 +1171,18 @@ export type WSClientMessage = | WSPeerDirRequestMessage | WSPeerDirResponseMessage | WSAuditQueryMessage - | WSAuditVerifyMessage; + | WSAuditVerifyMessage + | WSMcpDeployMessage + | WSMcpUndeployMessage + | WSMcpUpdateMessage + | WSMcpLogsMessage + | WSMcpScopeMessage + | WSMcpSchemaMessage + | WSMcpCatalogMessage + | WSSkillDeployMessage + | WSVaultSetMessage + | WSVaultListMessage + | WSVaultDeleteMessage; // --- Skill messages --- @@ -1217,6 +1252,23 @@ export interface WSSkillListMessage { _reqId?: string; } +/** Broker → client: deployment progress/result. */ +export interface WSMcpDeployStatusMessage { type: "mcp_deploy_status"; server_name: string; status: "building" | "installing" | "running" | "failed"; tools?: Array<{ name: string; description: string; inputSchema: object }>; error?: string; _reqId?: string; } +/** Broker → client: service log output. */ +export interface WSMcpLogsResultMessage { type: "mcp_logs_result"; server_name: string; lines: string[]; _reqId?: string; } +/** Broker → client: tool schema introspection result. */ +export interface WSMcpSchemaResultMessage { type: "mcp_schema_result"; server_name: string; tools: Array<{ name: string; description: string; inputSchema: object }>; _reqId?: string; } +/** Broker → client: full service catalog. */ +export interface WSMcpCatalogResultMessage { type: "mcp_catalog_result"; services: Array<{ name: string; type: "mcp" | "skill"; description: string; status: string; tool_count: number; deployed_by: string; scope: { type: string; [key: string]: unknown }; source_type: string; runtime?: string; created_at: string }>; _reqId?: string; } +/** Broker → client: scope query/set result. */ +export interface WSMcpScopeResultMessage { type: "mcp_scope_result"; server_name: string; scope: { type: string; [key: string]: unknown }; deployed_by: string; _reqId?: string; } +/** Broker → client: skill deploy acknowledgement. */ +export interface WSSkillDeployAckMessage { type: "skill_deploy_ack"; name: string; files: string[]; _reqId?: string; } +/** Broker → client: vault operation acknowledgement. */ +export interface WSVaultAckMessage { type: "vault_ack"; key: string; action: "stored" | "deleted" | "not_found"; _reqId?: string; } +/** Broker → client: vault entry listing. */ +export interface WSVaultListResultMessage { type: "vault_list_result"; entries: Array<{ key: string; entry_type: "env" | "file"; mount_path?: string; description?: string; updated_at: string }>; _reqId?: string; } + export type WSServerMessage = | WSHelloAckMessage | WSPushMessage @@ -1267,4 +1319,12 @@ export type WSServerMessage = | WSPeerDirResponseForwardMessage | WSAuditResultMessage | WSAuditVerifyResultMessage + | WSMcpDeployStatusMessage + | WSMcpLogsResultMessage + | WSMcpSchemaResultMessage + | WSMcpCatalogResultMessage + | WSMcpScopeResultMessage + | WSSkillDeployAckMessage + | WSVaultAckMessage + | WSVaultListResultMessage | WSErrorMessage; diff --git a/apps/cli/src/commands/launch.ts b/apps/cli/src/commands/launch.ts index 93a0a78..38da896 100644 --- a/apps/cli/src/commands/launch.ts +++ b/apps/cli/src/commands/launch.ts @@ -14,12 +14,13 @@ */ import { spawn } from "node:child_process"; -import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync } from "node:fs"; -import { tmpdir, hostname } from "node:os"; +import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync, existsSync, readFileSync } from "node:fs"; +import { tmpdir, hostname, homedir } from "node:os"; import { join } from "node:path"; import { createInterface } from "node:readline"; import { loadConfig, getConfigPath } from "../state/config"; import type { Config, JoinedMesh, GroupEntry } from "../state/config"; +import { BrokerClient } from "../ws/client"; // Flags as parsed by citty (index.ts is the source of truth for definitions). export interface LaunchFlags { @@ -277,6 +278,56 @@ export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise< } } catch { /* best effort */ } + // Clean up stale mesh MCP entries from crashed sessions + try { + const claudeConfigPath = join(homedir(), ".claude.json"); + if (existsSync(claudeConfigPath)) { + const claudeConfig = JSON.parse(readFileSync(claudeConfigPath, "utf-8")); + const mcpServers = claudeConfig.mcpServers ?? {}; + let cleaned = 0; + for (const key of Object.keys(mcpServers)) { + if (!key.startsWith("mesh:")) continue; + const meta = mcpServers[key]?._meshSession; + if (!meta?.pid) continue; + // Check if the PID is still alive + try { + process.kill(meta.pid, 0); // signal 0 = check existence + } catch { + // PID is dead — remove stale entry + delete mcpServers[key]; + cleaned++; + } + } + if (cleaned > 0) { + claudeConfig.mcpServers = mcpServers; + writeFileSync(claudeConfigPath, JSON.stringify(claudeConfig, null, 2) + "\n", "utf-8"); + } + } + } catch { /* best effort */ } + + // --- Fetch deployed services for native MCP entries --- + let serviceCatalog: Array<{ + name: string; + description: string; + status: string; + tools: Array<{ name: string; description: string; inputSchema: object }>; + deployed_by: string; + }> = []; + + try { + const tmpClient = new BrokerClient(mesh, { displayName }); + await tmpClient.connect(); + // Wait briefly for hello_ack with service catalog + await new Promise(r => setTimeout(r, 2000)); + serviceCatalog = tmpClient.serviceCatalog; + tmpClient.close(); + } catch { + // Non-fatal — launch without native service entries + if (!args.quiet) { + console.log(" (Could not fetch service catalog — mesh services won't be natively available)"); + } + } + // 4. Write session config to tmpdir (isolates mesh selection). const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-")); const sessionConfig: Config = { @@ -302,6 +353,59 @@ export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise< } } + // --- Install native MCP entries for deployed mesh services --- + const meshMcpEntries: Array<{ key: string; entry: unknown }> = []; + + if (serviceCatalog.length > 0) { + const claudeConfigPath = join(homedir(), ".claude.json"); + + // Read-modify-write: only touch mesh:* entries in mcpServers + let claudeConfig: Record = {}; + try { + claudeConfig = JSON.parse(readFileSync(claudeConfigPath, "utf-8")); + } catch { + claudeConfig = {}; + } + + const mcpServers = (claudeConfig.mcpServers ?? {}) as Record; + + // Session-scoped key: mesh:: + const sessionTag = `${process.pid}`; + + for (const svc of serviceCatalog) { + if (svc.status !== "running") continue; + const entryKey = `mesh:${svc.name}:${sessionTag}`; + const entry = { + command: "claudemesh", + args: ["mcp", "--service", svc.name], + env: { + CLAUDEMESH_CONFIG_DIR: tmpDir, + }, + _meshSession: { + pid: process.pid, + meshSlug: mesh.slug, + serviceName: svc.name, + createdAt: new Date().toISOString(), + }, + }; + mcpServers[entryKey] = entry; + meshMcpEntries.push({ key: entryKey, entry }); + } + + claudeConfig.mcpServers = mcpServers; + writeFileSync(claudeConfigPath, JSON.stringify(claudeConfig, null, 2) + "\n", "utf-8"); + + if (!args.quiet && meshMcpEntries.length > 0) { + console.log(` ${meshMcpEntries.length} mesh service(s) registered as native MCPs:`); + for (const { key } of meshMcpEntries) { + const svcName = key.split(":")[1]; + const svc = serviceCatalog.find(s => s.name === svcName); + console.log(` ${svcName} (${svc?.tools.length ?? 0} tools)`); + } + console.log(""); + } + } + // 6. Spawn claude with ephemeral config + dev channel + auto-permissions. // Strip any user-supplied --dangerously flags to avoid duplicates. const filtered: string[] = []; @@ -333,12 +437,28 @@ export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise< ...process.env, CLAUDEMESH_CONFIG_DIR: tmpDir, CLAUDEMESH_DISPLAY_NAME: displayName, + MCP_TIMEOUT: process.env.MCP_TIMEOUT ?? "30000", + MAX_MCP_OUTPUT_TOKENS: process.env.MAX_MCP_OUTPUT_TOKENS ?? "50000", ...(role ? { CLAUDEMESH_ROLE: role } : {}), }, }); // 7. Cleanup on exit. const cleanup = (): void => { + // Remove mesh MCP entries from ~/.claude.json + if (meshMcpEntries.length > 0) { + try { + const claudeConfigPath = join(homedir(), ".claude.json"); + const claudeConfig = JSON.parse(readFileSync(claudeConfigPath, "utf-8")); + const mcpServers = claudeConfig.mcpServers ?? {}; + for (const { key } of meshMcpEntries) { + delete mcpServers[key]; + } + claudeConfig.mcpServers = mcpServers; + writeFileSync(claudeConfigPath, JSON.stringify(claudeConfig, null, 2) + "\n", "utf-8"); + } catch { /* best effort */ } + } + // Existing tmpdir cleanup try { rmSync(tmpDir, { recursive: true, force: true }); } catch { diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index e04ad12..765d140 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -22,7 +22,8 @@ import type { SetSummaryArgs, ListPeersArgs, } from "./types"; -import type { BrokerClient, InboundPush } from "../ws/client"; +import { BrokerClient } from "../ws/client"; +import type { InboundPush } from "../ws/client"; /** Compute a human-readable relative time string from an ISO timestamp. */ function relativeTime(isoStr: string): string { @@ -144,6 +145,12 @@ function formatPush(p: InboundPush, meshSlug: string): string { } export async function startMcpServer(): Promise { + // Check for --service mode (native mesh MCP proxy) + const serviceIdx = process.argv.indexOf("--service"); + if (serviceIdx !== -1 && process.argv[serviceIdx + 1]) { + return startServiceProxy(process.argv[serviceIdx + 1]!); + } + const config = loadConfig(); const myName = config.displayName ?? "unnamed"; @@ -1533,3 +1540,182 @@ Your message mode is "${messageMode}". process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); } + +/** + * Mesh service proxy — a thin MCP server that proxies ONE deployed service. + * + * Spawned by Claude Code as a native MCP entry. Connects to the broker, + * fetches tool schemas for the named service, and routes tool calls. + * + * If the broker WS drops, the proxy waits for reconnection (up to 10s) + * before failing tool calls. If the proxy process itself crashes, Claude + * Code will not auto-restart it. + */ +async function startServiceProxy(serviceName: string): Promise { + const config = loadConfig(); + if (config.meshes.length === 0) { + process.stderr.write(`[mesh:${serviceName}] no meshes joined\n`); + process.exit(1); + } + + const mesh = config.meshes[0]!; + const client = new BrokerClient(mesh, { + displayName: config.displayName ?? `proxy:${serviceName}`, + }); + + try { + await client.connect(); + } catch (e) { + process.stderr.write( + `[mesh:${serviceName}] broker connect failed: ${e instanceof Error ? e.message : String(e)}\n`, + ); + process.exit(1); + } + + // Wait for hello_ack and service catalog + await new Promise((r) => setTimeout(r, 1500)); + + // Fetch tool schemas for this service + let tools: Array<{ + name: string; + description: string; + inputSchema: Record; + }> = []; + try { + const fetched = await client.getServiceTools(serviceName); + tools = fetched as typeof tools; + } catch { + // Try from catalog cache + const cached = client.serviceCatalog.find((s) => s.name === serviceName); + if (cached) { + tools = cached.tools as typeof tools; + } + } + + if (tools.length === 0) { + process.stderr.write( + `[mesh:${serviceName}] no tools found — service may not be running\n`, + ); + } + + // Build MCP server + const server = new Server( + { name: `mesh:${serviceName}`, version: "0.1.0" }, + { capabilities: { tools: {} } }, + ); + + server.setRequestHandler(ListToolsRequestSchema, () => ({ + tools: tools.map((t) => ({ + name: t.name, + description: `[mesh:${serviceName}] ${t.description}`, + inputSchema: t.inputSchema as any, + })), + })); + + server.setRequestHandler(CallToolRequestSchema, async (req) => { + const toolName = req.params.name; + const args = req.params.arguments ?? {}; + + // Wait for broker reconnection if needed + if (client.status !== "open") { + let waited = 0; + while (client.status !== "open" && waited < 10_000) { + await new Promise((r) => setTimeout(r, 500)); + waited += 500; + } + if (client.status !== "open") { + return { + content: [ + { + type: "text" as const, + text: `Service temporarily unavailable — broker reconnecting. Retry in a few seconds.`, + }, + ], + isError: true, + }; + } + } + + try { + const result = await client.mcpCall( + serviceName, + toolName, + args as Record, + ); + if (result.error) { + return { + content: [{ type: "text" as const, text: `Error: ${result.error}` }], + isError: true, + }; + } + const resultText = + typeof result.result === "string" + ? result.result + : JSON.stringify(result.result, null, 2); + return { + content: [{ type: "text" as const, text: resultText }], + }; + } catch (e) { + return { + content: [ + { + type: "text" as const, + text: `Call failed: ${e instanceof Error ? e.message : String(e)}`, + }, + ], + isError: true, + }; + } + }); + + // Listen for service events (undeploy, update) + client.onPush((push) => { + if ( + push.event === "mcp_undeployed" && + (push.eventData as any)?.name === serviceName + ) { + process.stderr.write( + `[mesh:${serviceName}] service undeployed — exiting\n`, + ); + client.close(); + process.exit(0); + } + if ( + push.event === "mcp_updated" && + (push.eventData as any)?.name === serviceName + ) { + // Refresh tools + const newTools = (push.eventData as any)?.tools; + if (Array.isArray(newTools)) { + tools = newTools; + // Notify Claude Code that tools changed + server + .notification({ + method: "notifications/tools/list_changed", + }) + .catch(() => { + /* ignore notification errors */ + }); + } + } + }); + + // Start stdio transport + const transport = new StdioServerTransport(); + await server.connect(transport); + + // Keep event loop alive + const keepalive = setInterval(() => { + // Intentionally empty — prevents event loop from settling. + }, 1_000); + void keepalive; + + // Graceful shutdown + const shutdown = (): void => { + clearInterval(keepalive); + client.close(); + process.exit(0); + }; + process.on("SIGTERM", shutdown); + process.on("SIGINT", shutdown); +} diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index ce72117..70c8ecb 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -893,4 +893,82 @@ export const TOOLS: Tool[] = [ required: ["name"], }, }, + + // --- Service deployment tools --- + + { + name: "mesh_mcp_deploy", + description: "Deploy an MCP server to the mesh from a zip file or git repo. Runs on the broker VPS, persists across peer sessions. Default scope: private (only you).", + inputSchema: { + type: "object", + properties: { + server_name: { type: "string", description: "Unique name for the server in this mesh" }, + file_id: { type: "string", description: "File ID of uploaded zip (from share_file)" }, + git_url: { type: "string", description: "Git repo URL" }, + git_branch: { type: "string", description: "Branch to clone (default: main)" }, + env: { type: "object", description: "Environment variables. Use $vault: for vault secrets." }, + runtime: { type: "string", enum: ["node", "python", "bun"], description: "Runtime (auto-detected if omitted)" }, + memory_mb: { type: "number", description: "Memory limit in MB (default: 256)" }, + network_allow: { type: "array", items: { type: "string" }, description: "Allowed outbound hosts (default: none)" }, + scope: { description: "Visibility: 'peer' (default), 'mesh', or {group/groups/role/peers}" }, + }, + required: ["server_name"], + }, + }, + { + name: "mesh_mcp_undeploy", + description: "Stop and remove a managed MCP server from the mesh.", + inputSchema: { type: "object", properties: { server_name: { type: "string" } }, required: ["server_name"] }, + }, + { + name: "mesh_mcp_update", + description: "Pull latest code and restart a git-sourced MCP server.", + inputSchema: { type: "object", properties: { server_name: { type: "string" } }, required: ["server_name"] }, + }, + { + name: "mesh_mcp_logs", + description: "View recent logs from a managed MCP server.", + inputSchema: { type: "object", properties: { server_name: { type: "string" }, lines: { type: "number", description: "Lines (default: 50, max: 1000)" } }, required: ["server_name"] }, + }, + { + name: "mesh_mcp_scope", + description: "Get or set the visibility scope of a deployed MCP server.", + inputSchema: { type: "object", properties: { server_name: { type: "string" }, scope: { description: "New scope to set. Omit to read current." } }, required: ["server_name"] }, + }, + { + name: "mesh_mcp_schema", + description: "Inspect tool schemas for a deployed MCP server.", + inputSchema: { type: "object", properties: { server_name: { type: "string" }, tool_name: { type: "string", description: "Specific tool (omit for all)" } }, required: ["server_name"] }, + }, + { + name: "mesh_mcp_catalog", + description: "List all deployed services in the mesh with status, scope, and tool count.", + inputSchema: { type: "object", properties: {} }, + }, + + // --- Skill deployment tools --- + + { + name: "mesh_skill_deploy", + description: "Deploy a multi-file skill bundle from a zip or git repo.", + inputSchema: { type: "object", properties: { file_id: { type: "string" }, git_url: { type: "string" }, git_branch: { type: "string" } } }, + }, + + // --- Vault tools --- + + { + name: "vault_set", + description: "Store an encrypted credential in your vault. Reference in mesh_mcp_deploy with $vault:.", + inputSchema: { type: "object", properties: { key: { type: "string" }, value: { type: "string", description: "Secret value or local file path (for type=file)" }, type: { type: "string", enum: ["env", "file"] }, mount_path: { type: "string" }, description: { type: "string" } }, required: ["key", "value"] }, + }, + { + name: "vault_list", + description: "List your vault entries (keys and metadata only, no secret values).", + inputSchema: { type: "object", properties: {} }, + }, + { + name: "vault_delete", + description: "Remove a credential from your vault.", + inputSchema: { type: "object", properties: { key: { type: "string" } }, required: ["key"] }, + }, ]; diff --git a/apps/cli/src/mcp/types.ts b/apps/cli/src/mcp/types.ts index 77d51f7..8029be4 100644 --- a/apps/cli/src/mcp/types.ts +++ b/apps/cli/src/mcp/types.ts @@ -22,3 +22,60 @@ export interface SetSummaryArgs { export interface SetStatusArgs { status: PeerStatus; } + +// --- Service deployment types --- + +export type ServiceScope = + | "peer" + | "mesh" + | { peers: string[] } + | { group: string } + | { groups: string[] } + | { role: string }; + +export interface ServiceInfo { + name: string; + type: "mcp" | "skill"; + description: string; + status: string; + tool_count: number; + deployed_by: string; + scope: ServiceScope; + source_type: string; + runtime?: string; + created_at: string; +} + +export interface ServiceToolSchema { + name: string; + description: string; + inputSchema: Record; +} + +export interface VaultEntry { + key: string; + entry_type: "env" | "file"; + mount_path?: string; + description?: string; + updated_at: string; +} + +export interface MeshMcpDeployArgs { + server_name: string; + file_id?: string; + git_url?: string; + git_branch?: string; + env?: Record; + runtime?: "node" | "python" | "bun"; + memory_mb?: number; + network_allow?: string[]; + scope?: ServiceScope; +} + +export interface VaultSetArgs { + key: string; + value: string; + type?: "env" | "file"; + mount_path?: string; + description?: string; +} diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 1fb9d74..46de52d 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -114,6 +114,8 @@ export class BrokerClient { private peerDirResponseResolvers = new Map void; timer: NodeJS.Timeout }>(); /** Directories from which this peer serves files. Default: [process.cwd()]. */ private sharedDirs: string[] = [process.cwd()]; + private _serviceCatalog: Array<{ name: string; description: string; status: string; tools: Array<{ name: string; description: string; inputSchema: object }>; deployed_by: string }> = []; + get serviceCatalog() { return this._serviceCatalog; } private closed = false; private reconnectAttempt = 0; private helloTimer: NodeJS.Timeout | null = null; @@ -249,6 +251,9 @@ export class BrokerClient { this._statsCounters.errors = rs.errors ?? 0; } } + if ((msg as any).services) { + this._serviceCatalog = (msg as any).services; + } resolve(); return; } @@ -588,6 +593,14 @@ export class BrokerClient { private mcpCallResolvers = new Map void; timer: NodeJS.Timeout }>(); /** Handler for inbound mcp_call_forward messages. Set by the MCP server. */ private mcpCallForwardHandler: ((forward: { callId: string; serverName: string; toolName: string; args: Record; callerName: string }) => Promise<{ result?: unknown; error?: string }>) | null = null; + private vaultAckResolvers = new Map void; timer: NodeJS.Timeout }>(); + private vaultListResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpDeployResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpLogsResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpSchemaServiceResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpCatalogResolvers = new Map void; timer: NodeJS.Timeout }>(); + private mcpScopeResolvers = new Map void; timer: NodeJS.Timeout }>(); + private skillDeployResolvers = new Map void; timer: NodeJS.Timeout }>(); async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; @@ -1178,6 +1191,125 @@ export class BrokerClient { }); } + // --- Vault --- + + async vaultSet(key: string, ciphertext: string, nonce: string, sealedKey: string, entryType: "env" | "file", mountPath?: string, description?: string): Promise { + return new Promise(resolve => { + const reqId = `vset_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; + const timer = setTimeout(() => { this.vaultAckResolvers.delete(reqId); resolve(false); }, 10_000); + this.vaultAckResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "vault_set", key, ciphertext, nonce, sealed_key: sealedKey, entry_type: entryType, mount_path: mountPath, description, _reqId: reqId } as any); + }); + } + + async vaultList(): Promise { + return new Promise(resolve => { + const reqId = `vlist_${Date.now()}`; + const timer = setTimeout(() => { this.vaultListResolvers.delete(reqId); resolve([]); }, 10_000); + this.vaultListResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "vault_list", _reqId: reqId } as any); + }); + } + + async vaultDelete(key: string): Promise { + return new Promise(resolve => { + const reqId = `vdel_${Date.now()}`; + const timer = setTimeout(() => { this.vaultAckResolvers.delete(reqId); resolve(false); }, 10_000); + this.vaultAckResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "vault_delete", key, _reqId: reqId } as any); + }); + } + + // --- MCP Deploy --- + + async mcpDeploy(serverName: string, source: any, config?: any, scope?: any): Promise { + return new Promise(resolve => { + const reqId = `deploy_${Date.now()}`; + const timer = setTimeout(() => { this.mcpDeployResolvers.delete(reqId); resolve({ status: "timeout" }); }, 60_000); + this.mcpDeployResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_deploy", server_name: serverName, source, config, scope, _reqId: reqId } as any); + }); + } + + async mcpUndeploy(serverName: string): Promise { + return new Promise(resolve => { + const reqId = `undeploy_${Date.now()}`; + const timer = setTimeout(() => { this.mcpDeployResolvers.delete(reqId); resolve(false); }, 10_000); + this.mcpDeployResolvers.set(reqId, { resolve: (r: any) => resolve(r.status === "stopped"), timer }); + this.sendRaw({ type: "mcp_undeploy", server_name: serverName, _reqId: reqId } as any); + }); + } + + async mcpUpdate(serverName: string): Promise { + return new Promise(resolve => { + const reqId = `update_${Date.now()}`; + const timer = setTimeout(() => { this.mcpDeployResolvers.delete(reqId); resolve({ status: "timeout" }); }, 60_000); + this.mcpDeployResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_update", server_name: serverName, _reqId: reqId } as any); + }); + } + + async mcpLogs(serverName: string, lines?: number): Promise { + return new Promise(resolve => { + const reqId = `logs_${Date.now()}`; + const timer = setTimeout(() => { this.mcpLogsResolvers.delete(reqId); resolve([]); }, 10_000); + this.mcpLogsResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_logs", server_name: serverName, lines, _reqId: reqId } as any); + }); + } + + async mcpScope(serverName: string, scope?: any): Promise { + return new Promise(resolve => { + const reqId = `scope_${Date.now()}`; + const timer = setTimeout(() => { this.mcpScopeResolvers.delete(reqId); resolve({ scope: { type: "peer" }, deployed_by: "unknown" }); }, 10_000); + this.mcpScopeResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_scope", server_name: serverName, scope, _reqId: reqId } as any); + }); + } + + async mcpServiceSchema(serverName: string, toolName?: string): Promise { + return new Promise(resolve => { + const reqId = `schema_${Date.now()}`; + const timer = setTimeout(() => { this.mcpSchemaServiceResolvers.delete(reqId); resolve([]); }, 10_000); + this.mcpSchemaServiceResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_schema", server_name: serverName, tool_name: toolName, _reqId: reqId } as any); + }); + } + + async mcpCatalog(): Promise { + return new Promise(resolve => { + const reqId = `catalog_${Date.now()}`; + const timer = setTimeout(() => { this.mcpCatalogResolvers.delete(reqId); resolve([]); }, 10_000); + this.mcpCatalogResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "mcp_catalog", _reqId: reqId } as any); + }); + } + + // --- Skill Deploy --- + + async skillDeploy(source: any): Promise { + return new Promise(resolve => { + const reqId = `skilldeploy_${Date.now()}`; + const timer = setTimeout(() => { this.skillDeployResolvers.delete(reqId); resolve({ name: "unknown", files: [] }); }, 30_000); + this.skillDeployResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "skill_deploy", source, _reqId: reqId } as any); + }); + } + + async getServiceTools(serviceName: string): Promise { + // Check cached catalog first + const cached = this._serviceCatalog.find(s => s.name === serviceName); + if (cached?.tools?.length) return cached.tools; + // Fall back to schema query + return this.mcpServiceSchema(serviceName); + } + + /** Send a raw JSON frame to the broker (fire-and-forget). */ + private sendRaw(payload: Record): void { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify(payload)); + } + close(): void { this.closed = true; this.stopStatsReporting(); @@ -1730,6 +1862,78 @@ export class BrokerClient { this.resolveFromMap(this.webhookListResolvers, msgReqId, webhooks); return; } + if (msg.type === "vault_ack") { + const reqId = (msg as any)._reqId; + if (reqId && this.vaultAckResolvers.has(reqId)) { + const r = this.vaultAckResolvers.get(reqId)!; + clearTimeout(r.timer); + this.vaultAckResolvers.delete(reqId); + r.resolve(msg.action !== "not_found"); + } + } + if (msg.type === "vault_list_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.vaultListResolvers.has(reqId)) { + const r = this.vaultListResolvers.get(reqId)!; + clearTimeout(r.timer); + this.vaultListResolvers.delete(reqId); + r.resolve((msg as any).entries ?? []); + } + } + if (msg.type === "mcp_deploy_status") { + const reqId = (msg as any)._reqId; + if (reqId && this.mcpDeployResolvers.has(reqId)) { + const r = this.mcpDeployResolvers.get(reqId)!; + clearTimeout(r.timer); + this.mcpDeployResolvers.delete(reqId); + r.resolve({ status: (msg as any).status, tools: (msg as any).tools, error: (msg as any).error }); + } + } + if (msg.type === "mcp_logs_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.mcpLogsResolvers.has(reqId)) { + const r = this.mcpLogsResolvers.get(reqId)!; + clearTimeout(r.timer); + this.mcpLogsResolvers.delete(reqId); + r.resolve((msg as any).lines ?? []); + } + } + if (msg.type === "mcp_schema_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.mcpSchemaServiceResolvers.has(reqId)) { + const r = this.mcpSchemaServiceResolvers.get(reqId)!; + clearTimeout(r.timer); + this.mcpSchemaServiceResolvers.delete(reqId); + r.resolve((msg as any).tools ?? []); + } + } + if (msg.type === "mcp_catalog_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.mcpCatalogResolvers.has(reqId)) { + const r = this.mcpCatalogResolvers.get(reqId)!; + clearTimeout(r.timer); + this.mcpCatalogResolvers.delete(reqId); + r.resolve((msg as any).services ?? []); + } + } + if (msg.type === "mcp_scope_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.mcpScopeResolvers.has(reqId)) { + const r = this.mcpScopeResolvers.get(reqId)!; + clearTimeout(r.timer); + this.mcpScopeResolvers.delete(reqId); + r.resolve({ scope: (msg as any).scope, deployed_by: (msg as any).deployed_by }); + } + } + if (msg.type === "skill_deploy_ack") { + const reqId = (msg as any)._reqId; + if (reqId && this.skillDeployResolvers.has(reqId)) { + const r = this.skillDeployResolvers.get(reqId)!; + clearTimeout(r.timer); + this.skillDeployResolvers.delete(reqId); + r.resolve({ name: (msg as any).name, files: (msg as any).files ?? [] }); + } + } if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null; @@ -1787,6 +1991,14 @@ export class BrokerClient { [this.peerDirResponseResolvers, { error: "broker error" }], [this.webhookAckResolvers, null], [this.webhookListResolvers, []], + [this.vaultAckResolvers, false], + [this.vaultListResolvers, []], + [this.mcpDeployResolvers, { status: "error" }], + [this.mcpLogsResolvers, []], + [this.mcpSchemaServiceResolvers, []], + [this.mcpCatalogResolvers, []], + [this.mcpScopeResolvers, { scope: { type: "peer" }, deployed_by: "unknown" }], + [this.skillDeployResolvers, { name: "unknown", files: [] }], ]; for (const [map, defaultVal] of allMaps) { const first = (map as Map).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined; diff --git a/docs/mesh-services-spec.md b/docs/mesh-services-spec.md new file mode 100644 index 0000000..c3639ad --- /dev/null +++ b/docs/mesh-services-spec.md @@ -0,0 +1,1258 @@ +# Mesh Services: MCP Servers & Skills Platform + +> Consolidated spec for deploying, managing, and executing MCP servers +> and multi-file skills within a claudemesh mesh. Covers source modes, +> execution engine, credential vaults, access control, native Claude Code +> integration, and dynamic tool discovery. + +--- + +## Problem + +Today: +- **Skills** are a single `instructions` text field in Postgres. No multi-file support. +- **MCP servers** are live-proxied through the registering peer. When that peer disconnects, the server dies. The `persistent` flag is cosmetic. +- Neither supports bundled artifacts (templates, configs, schemas, example code). +- Claude Code has no way to discover mesh tools natively — peers must use the generic `mesh_tool_call` proxy. + +## Design goals + +1. Three source modes — inline, zip bundle, git repo — for both skills and MCP servers +2. MCP servers run on the VPS, not on peers — true 24/7 persistence +3. Sandboxed execution with resource limits +4. Native Claude Code tool integration — deployed MCPs appear as regular MCP server entries +5. Per-peer credential vault for secrets (OAuth tokens, API keys) +6. Visibility scopes on services — peer, group, role, or mesh-wide — deployer controls who can call, not who sees secrets +7. Dynamic mid-session discovery via `notifications/tools/list_changed` +8. All existing behavior preserved — inline skills and live-proxy MCPs unchanged + +--- + +## Architecture overview + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ claudemesh launch --name Mou --mesh dev │ +│ │ +│ 1. Connect to broker, authenticate │ +│ 2. Fetch service catalog (scope-filtered for this peer) │ +│ 3. Write native MCP entries to ~/.claude.json: │ +│ mesh:gmail, mesh:context7, mesh:whatsapp │ +│ 4. Spawn claude │ +│ 5. On exit: remove mesh:* entries │ +└──────────┬───────────────────────────────────────────────────────┘ + │ + ▼ +┌──────────────────────────────────────────────────────────────────┐ +│ Claude Code session │ +│ │ +│ MCP: claudemesh (stdio) │ +│ ├── send_message, list_peers, set_summary, ... (peer comms) │ +│ ├── mesh_mcp_deploy, mesh_mcp_scope, ... (service mgmt) │ +│ ├── vault_set, vault_list, ... (credentials) │ +│ └── mesh_mcp_schema (introspection)│ +│ │ +│ MCP: mesh:gmail (stdio proxy) → mcp__mesh_gmail__* │ +│ MCP: mesh:context7 (stdio proxy) → mcp__mesh_context7__* │ +│ MCP: mesh:whatsapp (stdio proxy) → mcp__mesh_whatsapp__* │ +│ │ +│ MCP: playwriter (stdio, local) → local MCPs as usual │ +│ MCP: figma (stdio, local) │ +└──────────┬───────────────────────────────────────────────────────┘ + │ Each mesh:* proxy connects via WebSocket + ▼ +┌──────────────────────────────────────────────────────────────────┐ +│ Broker (VPS — wss://ic.claudemesh.com/ws) │ +│ │ +│ Existing: message routing, presence, state, memory, files, ... │ +│ │ +│ New: Service Catalog │ +│ ├── Scope enforcement (peer/group/role/mesh visibility) │ +│ ├── Tool schema registry (from runner) │ +│ ├── Deploy/undeploy/update commands │ +│ └── System events: mcp_deployed, mcp_undeployed │ +│ │ +│ New: Vault │ +│ └── Per-peer encrypted credential storage │ +│ │ +│ Tool call routing: │ +│ ├── Managed service? → forward to runner │ +│ └── Live proxy? → forward to hosting peer (existing) │ +└──────────┬───────────────────────────────────────────────────────┘ + │ stdio (child process) + ▼ +┌──────────────────────────────────────────────────────────────────┐ +│ Runner (one Docker container per mesh) │ +│ │ +│ Supervisor (Node main thread) │ +│ ├── stdin/stdout ↔ broker (JSON-RPC multiplexed) │ +│ ├── Routes tool calls by service name │ +│ ├── Lifecycle: load / unload / restart │ +│ ├── Health: MCP ping per child, restart on 3 failures │ +│ ├── Logs: 1000-line ring buffer per service │ +│ └── Vault: decrypts credentials at spawn time │ +│ │ +│ Child processes (one per MCP server): │ +│ ├── child_process.spawn("node", [...]) ← Node MCP servers │ +│ ├── child_process.spawn("uvx", [...]) ← Python MCP servers │ +│ ├── child_process.spawn("npx", [...]) ← npm MCP packages │ +│ │ │ +│ │ Each child: │ +│ │ ├── Own stdio pipe (MCP protocol) │ +│ │ ├── Own env vars (including vault-resolved secrets) │ +│ │ ├── Own /secrets// dir (vault files) │ +│ │ └── Killed individually on undeploy │ +│ │ │ +│ Base image: node:22 + python3.12 + uv + npx │ +│ Limits: --memory=512m --cpus=1 --network=mesh-restricted │ +└──────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Source modes + +### 1. Inline (existing, unchanged) + +``` +share_skill(name, description, instructions, tags) ← text-only skill +mesh_mcp_register(server_name, description, tools) ← live peer proxy +``` + +### 2. Zip bundle + +Upload a zip, then deploy: + +``` +1. share_file(path="./my-server.zip", tags=["mcp-bundle"]) → fileId +2. mesh_mcp_deploy(file_id=fileId, server_name="my-server", config={...}) +``` + +**MCP server zip structure:** +``` +my-mcp-server/ +├── package.json # or pyproject.toml / requirements.txt +├── src/index.ts # MCP server entry (stdio transport) +├── .env.example # declares required env vars +└── README.md +``` + +**Skill bundle zip structure:** +``` +my-skill/ +├── SKILL.md # instructions (replaces inline text) +├── skill.json # { name, description, tags } +├── templates/ # prompt templates, examples +└── schemas/ # JSON schemas, configs +``` + +### 3. Git repository + +``` +mesh_mcp_deploy( + git_url="https://github.com/user/my-mcp-server.git", + branch="main", + server_name="my-server", + config={ env: { API_KEY: "$vault:my-api-key" } } +) +``` + +- Shallow clone (`--depth 1`) +- Commit SHA pinned in DB for auditability +- `mesh_mcp_update(server_name)` → git pull + rebuild + restart +- Auth via `config.git_auth` (stored encrypted, never logged) + +--- + +## Execution engine + +### Why child processes, not worker threads + +MCP servers use **stdio transport** — each server owns its stdin/stdout via +`StdioServerTransport`. Two servers can't share one process. Worker threads +don't help because: +- MCP SDK `StdioServerTransport` takes over process stdin/stdout +- `npx @package/mcp-server` spawns its own process anyway +- Python MCPs need a Python runtime, not a Node thread + +The runner spawns each MCP server as a **child process** with its own stdio +pipe, exactly how every MCP server is designed to work. + +### Container design: one per mesh + +``` +┌─ Docker container (mesh: "dev") ─────────────────┐ +│ │ +│ Supervisor (Node main thread) │ +│ ├─ stdio ↔ broker │ +│ ├─ routes calls by service name │ +│ │ │ +│ ├─ spawn("npx", ["@upstash/context7-mcp"]) │ +│ │ └─ stdio pipe ↔ MCP protocol │ +│ ├─ spawn("node", ["dist/index.js"]) │ +│ │ └─ stdio pipe ↔ MCP protocol │ +│ ├─ spawn("uvx", ["mcp-outline"]) │ +│ │ └─ stdio pipe ↔ MCP protocol │ +│ └─ spawn("python", ["-m", "server"]) │ +│ └─ stdio pipe ↔ MCP protocol │ +│ │ +│ Base: node:22 + python3.12 + uv + npx │ +│ Limits: --memory=512m --cpus=1 │ +│ Network: mesh-restricted bridge (allowlist) │ +└───────────────────────────────────────────────────┘ +``` + +**Why one container, not N:** +- One Docker process to manage, one cgroup for the whole mesh +- One network namespace — single firewall config +- Shared node_modules / pip cache across services +- VPS resources: 8 vCores / 24GB — N containers exhausts memory fast + +**Why not zero containers (bare child processes on the broker):** +- Broker stays routing-only — runner crashes don't take it down +- Security boundary — runner can't access broker's DB or filesystem +- Runner can be on a different machine later (NUC, second VPS) + +### Supervisor protocol + +Broker ↔ runner communicate over the container's stdin/stdout as JSON lines: + +```typescript +// Broker → runner +{ action: "load", name: "gmail", path: "/services/gmail", env: {...} } +{ action: "call", name: "gmail", tool: "search_emails", args: {...}, callId: "abc" } +{ action: "unload", name: "gmail" } +{ action: "health", name: "gmail" } +{ action: "list_tools", name: "gmail" } + +// Runner → broker +{ callId: "abc", result: {...} } +{ callId: "abc", error: "connection refused" } +{ type: "loaded", name: "gmail", tools: [{name, description, inputSchema}] } +{ type: "unloaded", name: "gmail" } +{ type: "crashed", name: "gmail", restarts: 3, error: "OOM" } +{ type: "health", name: "gmail", ok: true, rssKb: 45000 } +``` + +### Runtime auto-detection + +| File found | Runtime | Spawn command | +|---|---|---| +| `package.json` | node | `npm install && node
` | +| `package.json` with npx hint | node | `npx ` | +| `pyproject.toml` | python | `pip install . && python -m ` | +| `requirements.txt` | python | `pip install -r requirements.txt && python ` | +| `Bunfile` or `bun.lockb` | bun | `bun install && bun ` | + +### Health & restart + +- Supervisor sends MCP `ping` to each child every 30s +- No response within 5s → mark unhealthy +- 3 consecutive failures → restart (kill + re-spawn) +- Max 5 restarts → status=`crashed`, notify deployer via mesh system event +- On crash: `{ type: "push", event: "mcp_crashed", eventData: { name, error, restarts } }` + +### Logs + +Per-service ring buffer (1000 lines). Captures child's stderr + stdout +(excluding MCP protocol JSON). Accessible via `mesh_mcp_logs(name, lines?)`. + +### Storage layout + +``` +/var/claudemesh/services/ +├── / +│ ├── / +│ │ ├── source/ # extracted zip or git clone +│ │ ├── secrets/ # vault-resolved credential files +│ │ ├── node_modules/ # or .venv/ for Python +│ │ └── .meta.json # { pid, startedAt, sha, runtime } +``` + +### Network policy + +Default: `--network=mesh-restricted` (Docker bridge with outbound deny-all). + +Per-service allowlist in deploy config: +```json +{ + "network_allow": [ + "gmail.googleapis.com:443", + "oauth2.googleapis.com:443", + "100.113.153.45:*" + ] +} +``` + +Implemented via iptables rules on the bridge, or per-container `--add-host` +entries combined with a proxy. For Tailscale-accessible services (NUC, etc.), +allow the Tailscale IP. + +--- + +## Credential vault + +### Design + +Per-peer encrypted storage on the broker. Credentials never leave the vault +in plaintext — decrypted only inside the runner container at spawn time. + +Peers don't share credentials. They share **access to the running MCP +server** via scopes. The MCP server runs with the deployer's credentials; +other peers call it without ever seeing the secrets. + +### Encryption model + +Same crypto as E2E file sharing (`crypto/file-crypto.ts`): + +1. Peer generates random symmetric key +2. Encrypts the credential with `crypto_secretbox` (symmetric) +3. Seals the symmetric key with their own pubkey (`crypto_box`) +4. Stores sealed key + ciphertext on broker — broker sees only ciphertext +5. At spawn time: runner requests decryption from the deployer's sealed key + (the runner holds a mesh-scoped keypair granted by the deployer at deploy time) + +### Vault reference syntax + +In `mesh_mcp_deploy` env config, `$vault:` prefix triggers vault resolution: + +``` +$vault:api-key → inject as env var +$vault:gmail-creds:file:/secrets/creds.json → decrypt, write to file, set env var to path +``` + +Examples: +```typescript +mesh_mcp_deploy({ + server_name: "gmail", + git_url: "https://github.com/gongrzhe/server-gmail-autoauth-mcp", + env: { + GMAIL_CREDENTIALS_PATH: "$vault:gmail-creds:file:/secrets/credentials.json", + GMAIL_OAUTH_PATH: "$vault:gmail-oauth:file:/secrets/gcp-oauth.keys.json", + }, + network_allow: ["gmail.googleapis.com:443", "oauth2.googleapis.com:443"], +}) +``` + +### MCP tools + +``` +vault_set(key, value, type?, mount_path?) — encrypt + store + value: string (env var) or local file path (reads + encrypts the file) + type: "env" (default) or "file" + mount_path: for files, where to write inside the service dir + +vault_list() — list keys (no values, metadata only) +vault_delete(key) — remove entry +``` + +### DB schema + +```sql +CREATE TABLE mesh.vault_entry ( + id TEXT PRIMARY KEY, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE, + member_id TEXT NOT NULL REFERENCES mesh.member(id), + key TEXT NOT NULL, + + -- E2E encrypted content + ciphertext BYTEA NOT NULL, + nonce BYTEA NOT NULL, + sealed_key BYTEA NOT NULL, -- symmetric key sealed with peer's pubkey + + -- Metadata (plaintext) + entry_type TEXT DEFAULT 'env' CHECK (entry_type IN ('env', 'file')), + mount_path TEXT, + description TEXT, + + created_at TIMESTAMP DEFAULT now(), + updated_at TIMESTAMP DEFAULT now(), + + UNIQUE (mesh_id, member_id, key) +); +``` + +--- + +## Visibility scopes + +### Model + +Scopes control who can see and call a service. Credentials are invisible to +callers — they interact with the running service, not the secrets behind it. +The deployer controls visibility; the vault handles secrets separately. + +### Scope levels + +| Scope | Who sees it | Use case | +|---|---|---| +| `peer` | Only the deployer (default) | Personal tools, staging before publish | +| `{ peers: [...] }` | Named peers | Shared between specific people | +| `{ group: "eng" }` | All @eng members | Team-specific tools | +| `{ groups: ["eng", "ops"] }` | Multiple groups | Cross-team tools | +| `{ role: "lead" }` | Any peer with that role | Role-gated admin tools | +| `mesh` | Everyone in the mesh | Shared utilities | + +### Examples + +``` +┌─────────────────────────────────────────────────┐ +│ Mesh: "dev-team" │ +│ │ +│ mesh scope ─── everyone │ +│ ├── context7 (utility) │ +│ ├── youtube-transcript │ +│ └── mesh-db (shared database) │ +│ │ +│ group scope ─── @group members only │ +│ ├── @eng │ +│ │ ├── github-mcp (eng team's GitHub) │ +│ │ └── ssh-manager (eng infra access) │ +│ ├── @sales │ +│ │ ├── apollo-io (sales CRM) │ +│ │ └── gmail (sales@ inbox) │ +│ └── @ops │ +│ ├── stalwart-mail (mail server admin) │ +│ └── namecheap (DNS management) │ +│ │ +│ role scope ─── by role tag │ +│ ├── lead → mesh-admin-tools (deploy, vault) │ +│ └── observer → (read-only MCPs only) │ +│ │ +│ peer scope ─── only specific peers │ +│ ├── Alejandro │ +│ │ ├── gmail-personal (my inbox) │ +│ │ └── gworkspace (my workspace) │ +│ └── Mou │ +│ └── cursor-composer (Mou's Cursor) │ +│ │ +└─────────────────────────────────────────────────┘ +``` + +### Deploy with scope + +```typescript +// Mesh scope — everyone +mesh_mcp_deploy({ + server_name: "context7", + source: { type: "git", url: "..." }, + scope: "mesh", +}) + +// Group scope — only @eng +mesh_mcp_deploy({ + server_name: "github-mcp", + source: { type: "git", url: "..." }, + scope: { group: "eng" }, +}) + +// Multi-group +mesh_mcp_deploy({ + server_name: "ssh-manager", + scope: { groups: ["eng", "ops"] }, +}) + +// Role scope — only leads +mesh_mcp_deploy({ + server_name: "mesh-admin", + scope: { role: "lead" }, +}) + +// Peer scope — just me (default) +mesh_mcp_deploy({ + server_name: "gmail-personal", + scope: "peer", +}) + +// Specific peers +mesh_mcp_deploy({ + server_name: "shared-workspace", + scope: { peers: ["Mou", "Alejandro"] }, +}) +``` + +### Enforcement + +- **At catalog time:** broker filters the service catalog by scope before + sending to peers in `hello_ack`. The peer's groups and role (from `hello`) + are matched against each service's scope. A tool you can't access never + appears in Claude's tool list. +- **At call time:** broker re-checks scope before routing. Double-check + in case catalog is stale or the peer's groups changed. + +### Scope resolution logic + +```typescript +function peerCanAccess(service: Service, peer: PeerConn): boolean { + const scope = service.scope; + if (typeof scope === "string") { + if (scope === "peer") return service.deployed_by === peer.memberId; + if (scope === "mesh") return true; + } + if ("peers" in scope) { + return scope.peers.some(p => + p === peer.memberId || p === peer.displayName); + } + if ("group" in scope) { + return peer.groups.some(g => g.name === scope.group); + } + if ("groups" in scope) { + return peer.groups.some(g => scope.groups.includes(g.name)); + } + if ("role" in scope) { + return peer.groups.some(g => g.role === scope.role); + } + return false; +} +``` + +### MCP tools + +``` +mesh_mcp_scope(server_name, scope?) + scope set: mesh_mcp_scope("gmail", { group: "sales" }) + scope read: mesh_mcp_scope("gmail") → { scope, deployed_by } +``` + +### Scope change events + +When a scope changes, the broker: +1. Computes which peers gained/lost access +2. Sends `mcp_scope_changed` system event to affected peers +3. Peers who gained access get `svc__*` dynamic tools via `list_changed` +4. Peers who lost access get tools removed via `list_changed` +5. Full native access requires session restart + +### DB + +Single column on `mesh.service`: + +```sql +scope JSONB DEFAULT '{"type": "peer"}' +-- {"type": "peer"} +-- {"type": "mesh"} +-- {"type": "peers", "allow": ["member_id_1", "member_id_2"]} +-- {"type": "group", "group": "eng"} +-- {"type": "groups", "groups": ["eng", "ops"]} +-- {"type": "role", "role": "lead"} +``` + +### Future: cross-mesh scope + +Not for v1. Each mesh is isolated. The schema supports it later: + +```json +{"type": "cross_mesh", "meshes": ["dev", "staging"]} +``` + +A service deployed in `dev` visible in `staging`. Requires the runner to be +accessible from both meshes (possible since it's on the VPS). + +--- + +## Native Claude Code integration + +### Goal + +Deployed mesh MCPs feel indistinguishable from locally installed MCP servers. +Claude sees `mcp__mesh_gmail__search_emails` — not `mesh_tool_call("gmail", ...)`. + +### At session start: native MCP entries + +`claudemesh launch` queries the broker for the scope-filtered service catalog +and installs each service as a native MCP entry before spawning Claude: + +```typescript +// commands/launch.ts — extended flow + +// Step 3 (new): fetch service catalog from broker +const catalog = await fetchServiceCatalog(mesh); + +// Step 4 (new): write mesh MCP entries to ~/.claude.json +for (const service of catalog) { + addMcpEntry(`mesh:${service.name}`, { + command: "claudemesh", + args: ["mcp", "--service", service.name], + }); +} + +// Step 5: spawn claude with mesh-aware env +const child = spawn("claude", claudeArgs, { + env: { + ...process.env, + CLAUDEMESH_CONFIG_DIR: tmpDir, + CLAUDEMESH_DISPLAY_NAME: displayName, + // Mesh calls traverse: proxy → WS → broker → runner → child. + // Default MCP timeout is too short for this chain. + MCP_TIMEOUT: process.env.MCP_TIMEOUT ?? "30000", + // Mesh MCPs may return large results (DB queries, file contents). + MAX_MCP_OUTPUT_TOKENS: process.env.MAX_MCP_OUTPUT_TOKENS ?? "50000", + }, +}); + +// Step 6 (extended): cleanup mesh:* entries on exit +child.on("exit", () => { + removeMcpEntries("mesh:*"); + cleanup(); // existing tmpdir cleanup +}); +``` + +Each `claudemesh mcp --service ` is a thin stdio proxy: + +```typescript +// Thin proxy: connects to broker, serves ONE service's tools +const client = new BrokerClient(mesh); +await client.connect(); +const tools = await client.getServiceTools(serviceName); + +server.setRequestHandler(ListToolsRequestSchema, () => ({ tools })); +server.setRequestHandler(CallToolRequestSchema, async (req) => { + // Wait for broker reconnection if WS is down (up to 10s) + if (client.status !== "open") { + const connected = await client.waitForConnection(10_000); + if (!connected) { + return text("Service temporarily unavailable — broker reconnecting. Retry in a few seconds.", true); + } + } + return await client.mcpCall(serviceName, req.params.name, req.params.arguments); +}); +``` + +**Resilience notes:** +- The `BrokerClient` handles WS reconnection with exponential backoff (1s→30s) +- Claude Code does NOT auto-restart crashed MCP servers — if the proxy + process itself dies, those tools vanish until session restart +- The proxy should catch all exceptions and return MCP errors, never crash +- `claudemesh doctor` diagnoses dead proxy processes mid-session + +**Result:** Claude Code starts and sees: +``` +mcp__mesh_gmail__search_emails ← proper namespace, full schema +mcp__mesh_gmail__send_email ← deferred by ToolSearch automatically +mcp__mesh_context7__query_docs ← native MCP, no indirection +``` + +### Session management + +**Safe `~/.claude.json` modification:** +- `~/.claude.json` stores MCP entries AND other Claude Code config (permissions, + env vars, etc.). Never overwrite the whole file. +- Read-modify-write: load full JSON → add/remove only `mesh:*` keys in + `mcpServers` → write back. Preserve all other keys. +- Use `flock` on writes to prevent concurrent session corruption. + +**Stale entry cleanup:** +- Each `mesh:*` entry includes `_meshSession` metadata with PID and timestamp +- `claudemesh launch` sweeps stale entries on startup (dead PID check) +- `claudemesh doctor` reports orphaned entries + +**Concurrent sessions:** +- Entries are session-scoped: `mesh:gmail:w1t0p0` (includes session ID) +- Each session manages only its own entries + +### Mid-session deploys: dynamic tools + +When a service is deployed after the Claude session started, native MCP entries +can't be added (Claude Code doesn't support adding new MCP servers mid-session). + +**Two-tier fallback:** + +1. **Claudemesh MCP fires `notifications/tools/list_changed`** (stdio, proven to work) + - Adds `svc____` tools to its own `tools/list` + - Claude sees them as `mcp__claudemesh__svc__gmail__search_emails` + - Works, but namespacing is less clean than native + +2. **System notification tells the peer:** + ``` + [mesh] Service deployed: "namecheap" by Alejandro (3 tools). + Available now via mesh_tool_call("namecheap", "domains_list", {...}). + Restart session for native mcp__mesh_namecheap__* access. + ``` + +3. **`mesh_tool_call` remains the universal fallback** — works for any + service at any time, native or not. + +### Mid-session undeploys + +When a service is undeployed, the native proxy process detects the broker +event and exits gracefully. Claude Code sees the MCP server disconnect and +stops offering those tools. No `list_changed` needed — MCP server death +is already handled. + +### Schema introspection + +For programmatic access to tool schemas (building workflows, debugging): + +``` +mesh_mcp_schema(server_name) → all tools with full inputSchema +mesh_mcp_schema(server_name, tool_name) → one specific tool's schema +mesh_mcp_catalog() → all services with tool counts, scope, status +``` + +--- + +## Database changes + +### New table: `mesh.service` + +```sql +CREATE TABLE mesh.service ( + id TEXT PRIMARY KEY, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE, + name TEXT NOT NULL, + type TEXT NOT NULL CHECK (type IN ('mcp', 'skill')), + + -- Source + source_type TEXT NOT NULL CHECK (source_type IN ('inline', 'zip', 'git')), + source_file_id TEXT REFERENCES mesh.file(id), + source_git_url TEXT, + source_git_branch TEXT DEFAULT 'main', + source_git_sha TEXT, + prev_git_sha TEXT, -- for rollback + + -- Content + description TEXT NOT NULL, + instructions TEXT, -- skills only + tools_schema JSONB, -- MCPs: [{ name, description, inputSchema }] + + -- Bundle + manifest JSONB, -- { files: [...], entry: "src/index.ts" } + + -- Execution (MCPs only) + runtime TEXT CHECK (runtime IN ('node', 'python', 'bun', NULL)), + status TEXT DEFAULT 'stopped' + CHECK (status IN ('building', 'installing', 'running', + 'stopped', 'failed', 'crashed', 'restarting')), + config JSONB DEFAULT '{}', -- resource limits, network policy + last_health TIMESTAMP, + restart_count INT DEFAULT 0, + version INT DEFAULT 1, + + -- Visibility scope + scope JSONB DEFAULT '{"type": "peer"}', + + -- Metadata + deployed_by TEXT REFERENCES mesh.member(id), + deployed_by_name TEXT, + created_at TIMESTAMP DEFAULT now() NOT NULL, + updated_at TIMESTAMP DEFAULT now() NOT NULL, + + UNIQUE (mesh_id, name) +); +``` + +### New table: `mesh.vault_entry` + +```sql +CREATE TABLE mesh.vault_entry ( + id TEXT PRIMARY KEY, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE, + member_id TEXT NOT NULL REFERENCES mesh.member(id), + key TEXT NOT NULL, + ciphertext BYTEA NOT NULL, + nonce BYTEA NOT NULL, + sealed_key BYTEA NOT NULL, + entry_type TEXT DEFAULT 'env' CHECK (entry_type IN ('env', 'file')), + mount_path TEXT, + description TEXT, + created_at TIMESTAMP DEFAULT now(), + updated_at TIMESTAMP DEFAULT now(), + UNIQUE (mesh_id, member_id, key) +); +``` + +### Extend `mesh.skill` (backward compat) + +```sql +ALTER TABLE mesh.skill + ADD COLUMN source_type TEXT DEFAULT 'inline' + CHECK (source_type IN ('inline', 'zip', 'git')), + ADD COLUMN bundle_file_id TEXT REFERENCES mesh.file(id), + ADD COLUMN git_url TEXT, + ADD COLUMN git_branch TEXT DEFAULT 'main', + ADD COLUMN git_sha TEXT, + ADD COLUMN manifest JSONB; +``` + +--- + +## Wire protocol additions + +### Client → broker + +```typescript +// --- Service deployment --- + +interface WSMcpDeployMessage { + type: "mcp_deploy"; + server_name: string; + source: + | { type: "zip"; file_id: string } + | { type: "git"; url: string; branch?: string; auth?: string }; + config?: { + env?: Record; // supports $vault: refs + memory_mb?: number; // default 256 + cpus?: number; // default 0.5 + network_allow?: string[]; // default: none + runtime?: "node" | "python" | "bun"; + }; + scope?: + | "peer" // private (default) + | "mesh" // everyone + | { peers: string[] } // named peers + | { group: string } // single group + | { groups: string[] } // multiple groups + | { role: string }; // by role tag + _reqId?: string; +} + +interface WSMcpUndeployMessage { + type: "mcp_undeploy"; + server_name: string; + _reqId?: string; +} + +interface WSMcpUpdateMessage { + type: "mcp_update"; + server_name: string; + _reqId?: string; +} + +interface WSMcpLogsMessage { + type: "mcp_logs"; + server_name: string; + lines?: number; // default 50, max 1000 + _reqId?: string; +} + +interface WSMcpScopeMessage { + type: "mcp_scope"; + server_name: string; + scope?: // set — omit to read current + | "peer" + | "mesh" + | { peers: string[] } + | { group: string } + | { groups: string[] } + | { role: string }; + _reqId?: string; +} + +interface WSMcpSchemaMessage { + type: "mcp_schema"; + server_name: string; + tool_name?: string; // omit for all tools + _reqId?: string; +} + +interface WSMcpCatalogMessage { + type: "mcp_catalog"; + _reqId?: string; +} + +// --- Skill deployment --- + +interface WSSkillDeployMessage { + type: "skill_deploy"; + source: + | { type: "zip"; file_id: string } + | { type: "git"; url: string; branch?: string; auth?: string }; + _reqId?: string; +} + +// --- Vault --- + +interface WSVaultSetMessage { + type: "vault_set"; + key: string; + ciphertext: string; // base64 + nonce: string; // base64 + sealed_key: string; // base64 + entry_type: "env" | "file"; + mount_path?: string; + description?: string; + _reqId?: string; +} + +interface WSVaultListMessage { + type: "vault_list"; + _reqId?: string; +} + +interface WSVaultDeleteMessage { + type: "vault_delete"; + key: string; + _reqId?: string; +} +``` + +### Broker → client + +```typescript +// --- Service responses --- + +interface WSMcpDeployStatusMessage { + type: "mcp_deploy_status"; + server_name: string; + status: "building" | "installing" | "running" | "failed"; + tools?: Array<{ name: string; description: string; inputSchema: object }>; + error?: string; + _reqId?: string; +} + +interface WSMcpLogsResultMessage { + type: "mcp_logs_result"; + server_name: string; + lines: string[]; + _reqId?: string; +} + +interface WSMcpSchemaResultMessage { + type: "mcp_schema_result"; + server_name: string; + tools: Array<{ name: string; description: string; inputSchema: object }>; + _reqId?: string; +} + +interface WSMcpCatalogResultMessage { + type: "mcp_catalog_result"; + services: Array<{ + name: string; + type: "mcp" | "skill"; + description: string; + status: string; + tool_count: number; + deployed_by: string; + scope: { type: string; [key: string]: unknown }; + source_type: string; + runtime?: string; + created_at: string; + }>; + _reqId?: string; +} + +interface WSMcpScopeResultMessage { + type: "mcp_scope_result"; + server_name: string; + scope: { type: string; [key: string]: unknown }; + deployed_by: string; + _reqId?: string; +} + +// --- Skill responses --- + +interface WSSkillDeployAckMessage { + type: "skill_deploy_ack"; + name: string; + files: string[]; + _reqId?: string; +} + +// --- Vault responses --- + +interface WSVaultAckMessage { + type: "vault_ack"; + key: string; + action: "stored" | "deleted" | "not_found"; + _reqId?: string; +} + +interface WSVaultListResultMessage { + type: "vault_list_result"; + entries: Array<{ + key: string; + entry_type: "env" | "file"; + mount_path?: string; + description?: string; + updated_at: string; + }>; + _reqId?: string; +} + +// --- System events (broadcast to mesh) --- + +// Sent as WSPushMessage with subtype: "system" +// event: "mcp_deployed" +// eventData: { name, description, tool_count, deployed_by, scope, tools: [...] } + +// event: "mcp_undeployed" +// eventData: { name, by } + +// event: "mcp_crashed" +// eventData: { name, error, restarts } + +// event: "mcp_updated" +// eventData: { name, prev_sha, new_sha, tools: [...] } +``` + +### Extended `hello_ack` + +```typescript +interface WSHelloAckMessage { + // ... existing fields ... + + /** Scope-filtered service catalog for this peer. */ + services?: Array<{ + name: string; + description: string; + status: string; + tools: Array<{ name: string; description: string; inputSchema: object }>; + deployed_by: string; + }>; +} +``` + +--- + +## MCP tool additions (CLI) + +### Service management tools + +```typescript +mesh_mcp_deploy(server_name, file_id?, git_url?, git_branch?, env?, runtime?, + memory_mb?, network_allow?, scope?) +mesh_mcp_undeploy(server_name) +mesh_mcp_update(server_name) // git-only: pull + rebuild + restart +mesh_mcp_logs(server_name, lines?) +mesh_mcp_scope(server_name, scope?) // set or read visibility scope +mesh_mcp_schema(server_name, tool?) // introspect tool schemas +mesh_mcp_catalog() // list all services with status +mesh_skill_deploy(file_id?, git_url?, git_branch?) +``` + +### Vault tools + +```typescript +vault_set(key, value, type?, mount_path?, description?) +vault_list() +vault_delete(key) +``` + +### Existing tools (unchanged) + +```typescript +share_skill(name, description, instructions, tags) // inline skills +mesh_mcp_register(server_name, description, tools) // live peer proxy +mesh_tool_call(server_name, tool_name, args) // universal fallback +mesh_mcp_list() // shows both proxy + managed +``` + +--- + +## Broker-side service manager + +New file: `apps/broker/src/service-manager.ts` + +### Interface + +```typescript +interface ServiceManager { + deploy(opts: { + meshId: string; + name: string; + source: { type: "zip"; fileId: string } + | { type: "git"; url: string; branch: string; auth?: string }; + config: ServiceConfig; + vaultEntries: Array<{ key: string; ciphertext: Buffer; nonce: Buffer; sealedKey: Buffer; + entryType: "env" | "file"; mountPath?: string }>; + }): Promise<{ tools: ToolDef[]; status: string }>; + + undeploy(meshId: string, name: string): Promise; + + update(meshId: string, name: string): Promise<{ tools: ToolDef[]; newSha?: string }>; + + callTool(meshId: string, serverName: string, toolName: string, + args: Record): Promise<{ result?: unknown; error?: string }>; + + logs(meshId: string, name: string, lines?: number): string[]; + + status(meshId: string, name: string): ServiceStatus; + + restoreAll(): Promise; // on broker boot +} +``` + +### Boot restore + +On broker startup: +1. Query `mesh.service WHERE status IN ('running', 'crashed', 'restarting')` +2. Set all to `status='restarting'` +3. Re-spawn runner container per mesh +4. Load each service's source and spawn child process +5. Set `status='running'` only after successful MCP `initialize` response +6. Services that fail to start → `status='failed'`, system event broadcast + +--- + +## Security model + +| Concern | Mitigation | +|---|---| +| Arbitrary code execution | Docker container, one per mesh | +| Resource exhaustion | `--memory=512m --cpus=1` per container | +| Filesystem escape | No host volume mounts | +| Secret leakage | Vault E2E encrypted, decrypted only inside container | +| Network exfiltration | `--network=mesh-restricted`, per-service allowlist | +| Malicious zip (path traversal) | Validate all paths within target dir, reject `..` | +| Git auth tokens | Stored encrypted in vault, passed via `GIT_ASKPASS` | +| Denial of service | Max 20 services per mesh, max 50MB zip, max 500MB image | +| Scope bypass | Double-check: filter catalog + check on call | +| OAuth token expiry | Store refresh tokens, notify deployer on persistent failure | +| Tool name collision | `svc__` prefix for mid-session dynamic tools | +| Stale MCP entries | PID check + age sweep on launch | +| Tool call timeout | `MCP_TIMEOUT=30000` set by launch (default too short for mesh chain) | +| Large tool output | `MAX_MCP_OUTPUT_TOKENS=50000` set by launch; proxy truncates if needed | +| Proxy crash | Claude Code won't auto-restart; `claudemesh doctor` diagnoses dead proxies | +| Broker restart | Proxies reconnect via BrokerClient backoff; calls return "reconnecting" during window | + +--- + +## CLI commands + +```bash +# Deploy from zip +claudemesh deploy ./my-server.zip --name my-server + +# Deploy from git +claudemesh deploy --git https://github.com/user/repo.git --name my-server + +# Deploy with vault refs +claudemesh vault set gmail-creds ~/.gmail-mcp/credentials.json --type file +claudemesh deploy --git https://github.com/user/gmail-mcp.git --name gmail \ + --env 'GMAIL_CREDENTIALS_PATH=$vault:gmail-creds:file:/secrets/creds.json' \ + --network-allow 'gmail.googleapis.com:443' + +# Set access +claudemesh scope gmail --mesh # everyone +claudemesh scope gmail --group eng # @eng only +claudemesh scope gmail --groups 'eng,ops' # @eng + @ops +claudemesh scope gmail --role lead # leads only +claudemesh scope gmail --peers 'Mou,Alejandro' # specific peers +claudemesh scope gmail --peer # private (deployer only) + +# Manage +claudemesh logs gmail +claudemesh update gmail # git-only: pull + rebuild +claudemesh undeploy gmail +claudemesh catalog # list all services + +# Skills +claudemesh skill deploy ./my-skill.zip +claudemesh skill deploy --git https://github.com/user/skill.git + +# Vault +claudemesh vault set api-key "sk-abc123" +claudemesh vault set oauth-creds ~/path/to/creds.json --type file +claudemesh vault list +claudemesh vault delete api-key +``` + +--- + +## Migration path + +| What | Before | After | +|---|---|---| +| `share_skill()` inline | works | unchanged | +| `mesh_mcp_register()` live proxy | works | unchanged, labeled "proxy" in catalog | +| Zip MCP server | not possible | `share_file` + `mesh_mcp_deploy` | +| Git MCP server | not possible | `mesh_mcp_deploy(git_url=...)` | +| Zip skill bundle | not possible | `mesh_skill_deploy(file_id=...)` | +| Git skill | not possible | `mesh_skill_deploy(git_url=...)` | +| `mesh_tool_call` | forwards to peer | routes to runner OR forwards to peer | +| `mesh_mcp_list` | proxy only | shows proxy + managed, with status | +| Tool discovery | manual `mesh_mcp_list` | native MCP entries at launch + mid-session events | +| Credentials | plaintext env vars | E2E encrypted vault with `$vault:` refs | +| Access control | none (anyone can call) | Scopes: peer/group/role/mesh per service | + +All existing behavior preserved. New capabilities are additive. + +--- + +## Implementation order + +### Phase 1: Foundation +1. DB migration — `mesh.service` table, `mesh.vault_entry` table, extend `mesh.skill` +2. Wire protocol — add all new message types to `types.ts` +3. Vault — broker-side storage + CLI tools (`vault_set`, `vault_list`, `vault_delete`) +4. Service catalog — `mcp_catalog`, `mcp_schema`, scope filtering in `hello_ack` + +### Phase 2: Execution engine +5. Runner supervisor — `service-manager.ts`, child process spawn/kill/restart/health +6. Docker container — base image, build + run lifecycle +7. Deploy flow — zip extraction, git clone, runtime detection, `npm install` / `pip install` +8. Tool call routing — broker routes managed service calls to runner + +### Phase 3: Native integration +9. Launch integration — `claudemesh launch` writes `mesh:*` MCP entries to `~/.claude.json` +10. Stdio proxy — `claudemesh mcp --service ` thin proxy command +11. Mid-session fallback — `svc__*` dynamic tools + `list_changed` on claudemesh MCP +12. Session cleanup — stale entry sweep, PID checks, `flock` on config writes + +### Phase 4: Skill bundles +13. Skill deploy — zip/git extraction, `SKILL.md` + `skill.json` parsing, manifest storage +14. `get_skill` extension — returns structured file contents from bundle + +### Phase 5: Polish +15. `mesh_mcp_update` — git pull + rebuild + restart flow +16. Boot restore — re-spawn services on broker restart +17. CLI commands — `claudemesh deploy`, `claudemesh vault`, `claudemesh scope`, `claudemesh catalog` +18. Docs + example bundles — sample MCP server zip, sample skill bundle + +--- + +## Appendix: Claude Code MCP behavior (verified) + +Key findings from Claude Code MCP architecture research that informed this +spec. These are behaviors of Claude Code itself, not the MCP protocol. + +### Lifecycle +- MCP servers start when a session begins, stop when it ends +- **No auto-restart on crash** — next tool invocation fails. Our proxy must + handle reconnection to the broker independently +- No health checks from Claude Code — failures discovered on tool use +- `MCP_TIMEOUT` env var controls tool call timeout + +### Dynamic tools +- `notifications/tools/list_changed` is supported and triggers immediate + re-fetch of `tools/list` — works mid-conversation over stdio +- **SSE/HTTP transport support for `list_changed` may be unreliable** — known + bug in some versions. This is why we use stdio proxies, not HTTP transport. + +### ToolSearch / deferred tools +- Enabled by default (`ENABLE_TOOL_SEARCH=true`) +- Only tool **names** are loaded at startup — full schemas fetched on demand +- Requires Sonnet 4+ or Opus 4+ (Haiku does not support tool references) +- Adding 100+ MCP tools has near-zero context cost at startup +- Configurable: `ENABLE_TOOL_SEARCH=auto:5` loads upfront if <5% of context + +### Tool output limits +- Warning at 10,000 tokens, hard limit at 25,000 tokens (default) +- Configurable via `MAX_MCP_OUTPUT_TOKENS` env var +- Per-tool override: `_meta["anthropic/maxResultSizeChars"]` (up to 500K chars) + +### Namespacing +- Tools namespaced as `mcp__servername__toolname` +- Two servers with same tool name → no conflict (different namespace) +- Server names normalized: spaces → underscores + +### Registration +- **File-based only** — no runtime API to add MCP servers +- Scopes: `local` (~/.claude.json), `project` (.mcp.json), `user` (~/.claude.json global) +- Precedence: local > project > user +- `claude mcp add --scope user` for global, `--scope project` for team-shared +- **Cannot add new MCP server entries mid-session** — this is why `claudemesh + launch` pre-writes entries before spawning, and mid-session deploys fall + back to dynamic `svc__*` tools on the claudemesh MCP server + +### Environment variables +- Passed via `--env KEY=VALUE` on `claude mcp add` +- `.mcp.json` supports `${VAR}` and `${VAR:-default}` expansion +- Special: `${CLAUDE_PLUGIN_ROOT}`, `${CLAUDE_PLUGIN_DATA}` + +### Implications for this spec +- Native MCP entries MUST be written before `claude` spawns → `claudemesh launch` flow +- Stdio transport is the only reliable path for `list_changed` → thin proxy model +- ToolSearch means 100+ mesh tools have negligible context cost +- No server dependencies → each mesh proxy is independent +- No auto-restart → proxies must reconnect to broker on their own diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 6bae565..40e5b0f 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -454,6 +454,12 @@ export const meshSkill = meshSchema.table( tags: text().array().default([]), authorMemberId: text().references(() => meshMember.id), authorName: text(), + sourceType: text().default("inline"), + bundleFileId: text().references(() => meshFile.id), + gitUrl: text(), + gitBranch: text().default("main"), + gitSha: text(), + manifest: jsonb(), createdAt: timestamp().defaultNow().notNull(), updatedAt: timestamp().defaultNow().notNull(), }, @@ -487,6 +493,63 @@ export const meshWebhook = meshSchema.table( (table) => [uniqueIndex("webhook_mesh_name_idx").on(table.meshId, table.name)], ); +export const meshService = meshSchema.table( + "service", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + name: text().notNull(), + type: text().notNull(), + sourceType: text().notNull(), + sourceFileId: text().references(() => meshFile.id), + sourceGitUrl: text(), + sourceGitBranch: text().default("main"), + sourceGitSha: text(), + prevGitSha: text(), + description: text().notNull(), + instructions: text(), + toolsSchema: jsonb(), + manifest: jsonb(), + runtime: text(), + status: text().default("stopped"), + config: jsonb().default({}), + lastHealth: timestamp(), + restartCount: integer().default(0), + version: integer().default(1), + scope: jsonb().default({ type: "peer" }), + deployedBy: text().references(() => meshMember.id), + deployedByName: text(), + createdAt: timestamp().defaultNow().notNull(), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [uniqueIndex("service_mesh_name_idx").on(table.meshId, table.name)], +); + +export const meshVaultEntry = meshSchema.table( + "vault_entry", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + memberId: text() + .references(() => meshMember.id) + .notNull(), + key: text().notNull(), + ciphertext: text().notNull(), + nonce: text().notNull(), + sealedKey: text().notNull(), + entryType: text().default("env"), + mountPath: text(), + description: text(), + createdAt: timestamp().defaultNow().notNull(), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [uniqueIndex("vault_entry_mesh_member_key_idx").on(table.meshId, table.memberId, table.key)], +); + export const meshWebhookRelations = relations(meshWebhook, ({ one }) => ({ mesh: one(mesh, { fields: [meshWebhook.meshId], @@ -787,9 +850,34 @@ export const meshSkillRelations = relations(meshSkill, ({ one }) => ({ fields: [meshSkill.authorMemberId], references: [meshMember.id], }), + bundleFile: one(meshFile, { + fields: [meshSkill.bundleFileId], + references: [meshFile.id], + }), })); export const selectMeshSkillSchema = createSelectSchema(meshSkill); export const insertMeshSkillSchema = createInsertSchema(meshSkill); export type SelectMeshSkill = typeof meshSkill.$inferSelect; export type InsertMeshSkill = typeof meshSkill.$inferInsert; + +export const meshServiceRelations = relations(meshService, ({ one }) => ({ + mesh: one(mesh, { fields: [meshService.meshId], references: [mesh.id] }), + sourceFile: one(meshFile, { fields: [meshService.sourceFileId], references: [meshFile.id] }), + deployer: one(meshMember, { fields: [meshService.deployedBy], references: [meshMember.id] }), +})); + +export const selectMeshServiceSchema = createSelectSchema(meshService); +export const insertMeshServiceSchema = createInsertSchema(meshService); +export type SelectMeshService = typeof meshService.$inferSelect; +export type InsertMeshService = typeof meshService.$inferInsert; + +export const meshVaultEntryRelations = relations(meshVaultEntry, ({ one }) => ({ + mesh: one(mesh, { fields: [meshVaultEntry.meshId], references: [mesh.id] }), + member: one(meshMember, { fields: [meshVaultEntry.memberId], references: [meshMember.id] }), +})); + +export const selectMeshVaultEntrySchema = createSelectSchema(meshVaultEntry); +export const insertMeshVaultEntrySchema = createInsertSchema(meshVaultEntry); +export type SelectMeshVaultEntry = typeof meshVaultEntry.$inferSelect; +export type InsertMeshVaultEntry = typeof meshVaultEntry.$inferInsert;