feat: mesh services platform — deploy MCP servers, vaults, scopes
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Add the foundation for deploying and managing MCP servers on the VPS
broker, with per-peer credential vaults and visibility scopes.

Architecture:
- One Docker container per mesh with a Node supervisor
- Each MCP server runs as a child process with its own stdio pipe
- claudemesh launch installs native MCP entries in ~/.claude.json
- Mid-session deploys fall back to svc__* dynamic tools + list_changed

New components:
- DB: mesh.service + mesh.vault_entry tables, mesh.skill extensions
- Broker: 19 wire protocol types, 11 message handlers, service catalog
  in hello_ack with scope filtering, service-manager.ts (775 lines)
- CLI: 13 tool definitions, 12 WS client methods, tool call handlers,
  startServiceProxy() for native MCP proxy mode
- Launch: catalog fetch, native MCP entry install, stale sweep, cleanup,
  MCP_TIMEOUT=30s, MAX_MCP_OUTPUT_TOKENS=50k

Security: path sanitization on service names, column whitelist on
upsertService, returning()-based delete checks, vault E2E encryption.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-08 10:53:03 +01:00
parent a4f2e0aa81
commit e1cafa54b3
12 changed files with 3126 additions and 4 deletions

View File

@@ -39,8 +39,10 @@ import {
meshMember as memberTable,
meshMemory,
meshState,
meshService,
meshSkill,
meshStream,
meshVaultEntry,
meshTask,
messageQueue,
pendingStatus,
@@ -1951,3 +1953,91 @@ export async function meshSchema(
}
return [...tables.entries()].map(([name, columns]) => ({ name, columns }));
}
// ---------------------------------------------------------------------------
// Vault operations
// ---------------------------------------------------------------------------
export async function vaultSet(meshId: string, memberId: string, key: string, ciphertext: string, nonce: string, sealedKey: string, entryType: "env" | "file", mountPath?: string, description?: string): Promise<string> {
const existing = await db.select({ id: meshVaultEntry.id }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), eq(meshVaultEntry.key, key))).limit(1);
if (existing.length > 0) {
await db.update(meshVaultEntry).set({ ciphertext, nonce, sealedKey, entryType, mountPath: mountPath ?? null, description: description ?? null, updatedAt: new Date() }).where(eq(meshVaultEntry.id, existing[0]!.id));
return existing[0]!.id;
}
const [row] = await db.insert(meshVaultEntry).values({ meshId, memberId, key, ciphertext, nonce, sealedKey, entryType, mountPath: mountPath ?? null, description: description ?? null }).returning({ id: meshVaultEntry.id });
return row!.id;
}
export async function vaultList(meshId: string, memberId: string) {
return db.select({ key: meshVaultEntry.key, entryType: meshVaultEntry.entryType, mountPath: meshVaultEntry.mountPath, description: meshVaultEntry.description, updatedAt: meshVaultEntry.updatedAt }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId)));
}
export async function vaultDelete(meshId: string, memberId: string, key: string): Promise<boolean> {
const deleted = await db.delete(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), eq(meshVaultEntry.key, key))).returning({ id: meshVaultEntry.id });
return deleted.length > 0;
}
export async function vaultGetEntries(meshId: string, memberId: string, keys: string[]) {
if (keys.length === 0) return [];
return db.select({ key: meshVaultEntry.key, ciphertext: meshVaultEntry.ciphertext, nonce: meshVaultEntry.nonce, sealedKey: meshVaultEntry.sealedKey, entryType: meshVaultEntry.entryType, mountPath: meshVaultEntry.mountPath }).from(meshVaultEntry).where(and(eq(meshVaultEntry.meshId, meshId), eq(meshVaultEntry.memberId, memberId), inArray(meshVaultEntry.key, keys)));
}
// ---------------------------------------------------------------------------
// Service catalog operations
// ---------------------------------------------------------------------------
export async function upsertService(meshId: string, name: string, data: { type: "mcp" | "skill"; sourceType: string; description: string; sourceFileId?: string; sourceGitUrl?: string; sourceGitBranch?: string; sourceGitSha?: string; instructions?: string; toolsSchema?: unknown; manifest?: unknown; runtime?: string; status?: string; config?: unknown; scope?: unknown; deployedBy?: string; deployedByName?: string }): Promise<string> {
// Whitelist allowed fields — prevent mass-assignment of id, meshId, createdAt, etc.
const fields: Record<string, unknown> = {
type: data.type,
sourceType: data.sourceType,
description: data.description,
...(data.sourceFileId !== undefined && { sourceFileId: data.sourceFileId }),
...(data.sourceGitUrl !== undefined && { sourceGitUrl: data.sourceGitUrl }),
...(data.sourceGitBranch !== undefined && { sourceGitBranch: data.sourceGitBranch }),
...(data.sourceGitSha !== undefined && { sourceGitSha: data.sourceGitSha }),
...(data.instructions !== undefined && { instructions: data.instructions }),
...(data.toolsSchema !== undefined && { toolsSchema: data.toolsSchema }),
...(data.manifest !== undefined && { manifest: data.manifest }),
...(data.runtime !== undefined && { runtime: data.runtime }),
...(data.status !== undefined && { status: data.status }),
...(data.config !== undefined && { config: data.config }),
...(data.scope !== undefined && { scope: data.scope }),
...(data.deployedBy !== undefined && { deployedBy: data.deployedBy }),
...(data.deployedByName !== undefined && { deployedByName: data.deployedByName }),
};
const existing = await db.select({ id: meshService.id }).from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).limit(1);
if (existing.length > 0) {
await db.update(meshService).set({ ...fields, updatedAt: new Date() } as any).where(eq(meshService.id, existing[0]!.id));
return existing[0]!.id;
}
const [row] = await db.insert(meshService).values({ meshId, name, ...fields } as any).returning({ id: meshService.id });
return row!.id;
}
export async function updateServiceStatus(meshId: string, name: string, status: string, extra?: { toolsSchema?: unknown; restartCount?: number; lastHealth?: Date }) {
await db.update(meshService).set({ status, ...(extra ?? {}), updatedAt: new Date() } as any).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name)));
}
export async function updateServiceScope(meshId: string, name: string, scope: unknown) {
await db.update(meshService).set({ scope, updatedAt: new Date() } as any).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name)));
}
export async function getService(meshId: string, name: string) {
const rows = await db.select().from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).limit(1);
return rows[0] ?? null;
}
export async function listDbMeshServices(meshId: string) {
return db.select().from(meshService).where(eq(meshService.meshId, meshId));
}
export async function deleteService(meshId: string, name: string): Promise<boolean> {
const deleted = await db.delete(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.name, name))).returning({ id: meshService.id });
return deleted.length > 0;
}
export async function getRunningServices(meshId: string) {
return db.select().from(meshService).where(and(eq(meshService.meshId, meshId), eq(meshService.status, "running")));
}

View File

@@ -28,6 +28,9 @@ const envSchema = z.object({
NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"),
NEO4J_PASSWORD: z.string().default("changeme"),
CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"),
MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20),
MAX_SERVICE_ZIP_BYTES: z.coerce.number().int().positive().default(50 * 1024 * 1024),
NODE_ENV: z
.enum(["development", "production", "test"])
.default("development"),

View File

@@ -69,7 +69,17 @@ import {
getSkill,
listSkills,
removeSkill,
vaultSet,
vaultList,
vaultDelete,
upsertService,
updateServiceStatus,
updateServiceScope,
getService,
listDbMeshServices,
deleteService,
} from "./broker";
import * as serviceManager from "./service-manager";
import { ensureBucket, meshBucketName, minioClient } from "./minio";
import { qdrant, meshCollectionName, ensureCollection } from "./qdrant";
import { neo4jDriver, meshDbName, ensureDatabase } from "./neo4j-client";
@@ -1210,6 +1220,36 @@ function handleConnection(ws: WebSocket): void {
if (result.restoredGroups) ackPayload.restoredGroups = result.restoredGroups;
if (result.restoredStats) ackPayload.restoredStats = result.restoredStats;
}
// Attach scope-filtered service catalog
try {
const helloConn = connections.get(presenceId);
if (helloConn) {
const allSvcs = await listDbMeshServices(helloConn.meshId);
const myGroups = helloConn.groups ?? [];
ackPayload.services = allSvcs
.filter(svc => {
if (svc.status !== "running") return false;
const scope = svc.scope as any;
if (!scope) return false;
const t = typeof scope === "string" ? scope : scope.type;
if (t === "mesh") return true;
if (t === "peer") return svc.deployedBy === helloConn.memberId;
if (scope.peers) return scope.peers.includes(helloConn.displayName) || scope.peers.includes(helloConn.memberId);
if (scope.group) return myGroups.some((g: any) => g.name === scope.group);
if (scope.groups) return myGroups.some((g: any) => scope.groups.includes(g.name));
if (scope.role) return myGroups.some((g: any) => g.role === scope.role);
return false;
})
.map(s => ({
name: s.name,
description: s.description,
status: s.status ?? "stopped",
tools: (s.toolsSchema as any[]) ?? [],
deployed_by: s.deployedByName ?? "unknown",
}));
}
} catch { /* non-fatal */ }
ws.send(JSON.stringify(ackPayload));
} catch {
/* ws closed during hello */
@@ -3087,6 +3127,146 @@ function handleConnection(ws: WebSocket): void {
log.info("ws delete_webhook", { presence_id: presenceId, name: dw.name });
break;
}
// --- Vault ---
case "vault_set": {
const vs = msg as any;
try {
await vaultSet(conn.meshId, conn.memberId, vs.key, vs.ciphertext, vs.nonce, vs.sealed_key, vs.entry_type, vs.mount_path, vs.description);
sendToPeer(presenceId, { type: "vault_ack", key: vs.key, action: "stored", _reqId: vs._reqId } as any);
} catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, vs._reqId); }
break;
}
case "vault_list": {
try {
const entries = await vaultList(conn.meshId, conn.memberId);
sendToPeer(presenceId, { type: "vault_list_result", entries: entries.map((e: any) => ({ key: e.key, entry_type: e.entryType, mount_path: e.mountPath, description: e.description, updated_at: e.updatedAt?.toISOString() })), _reqId: (msg as any)._reqId } as any);
} catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, (msg as any)._reqId); }
break;
}
case "vault_delete": {
const vd = msg as any;
try {
const ok = await vaultDelete(conn.meshId, conn.memberId, vd.key);
sendToPeer(presenceId, { type: "vault_ack", key: vd.key, action: ok ? "deleted" : "not_found", _reqId: vd._reqId } as any);
} catch (e) { sendError(ws, "vault_error", e instanceof Error ? e.message : String(e), undefined, vd._reqId); }
break;
}
// --- MCP Deploy/Undeploy ---
case "mcp_deploy": {
const md = msg as any;
try {
// Validate service name (path traversal protection)
const nameError = serviceManager.validateServiceName(md.server_name ?? "");
if (nameError) {
sendError(ws, "invalid_name", nameError, undefined, md._reqId);
break;
}
const existing = await listDbMeshServices(conn.meshId);
if (existing.length >= env.MAX_SERVICES_PER_MESH) {
sendError(ws, "limit", `max ${env.MAX_SERVICES_PER_MESH} services per mesh`, undefined, md._reqId);
break;
}
await upsertService(conn.meshId, md.server_name, {
type: "mcp", sourceType: md.source.type, description: `MCP server: ${md.server_name}`,
sourceFileId: md.source.type === "zip" ? md.source.file_id : undefined,
sourceGitUrl: md.source.type === "git" ? md.source.url : undefined,
sourceGitBranch: md.source.type === "git" ? md.source.branch : undefined,
runtime: md.config?.runtime, status: "building", config: md.config ?? {},
scope: md.scope ?? "peer", deployedBy: conn.memberId, deployedByName: conn.displayName,
});
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "building", _reqId: md._reqId } as any);
broadcastToMesh(conn.meshId, {
type: "push", subtype: "system" as const, event: "mcp_deployed",
eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: 0, deployed_by: conn.displayName, scope: md.scope ?? "peer" },
messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system",
priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(),
});
log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name });
} catch (e) { sendError(ws, "deploy_error", e instanceof Error ? e.message : String(e), undefined, md._reqId); }
break;
}
case "mcp_undeploy": {
const mu = msg as any;
try {
await serviceManager.undeploy(conn.meshId, mu.server_name);
await deleteService(conn.meshId, mu.server_name);
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: mu.server_name, status: "stopped", _reqId: mu._reqId } as any);
broadcastToMesh(conn.meshId, {
type: "push", subtype: "system" as const, event: "mcp_undeployed",
eventData: { name: mu.server_name, by: conn.displayName },
messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system",
priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(),
});
log.info("ws mcp_undeploy", { presence_id: presenceId, name: mu.server_name });
} catch (e) { sendError(ws, "undeploy_error", e instanceof Error ? e.message : String(e), undefined, mu._reqId); }
break;
}
case "mcp_update": {
const mup = msg as any;
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: mup.server_name, status: "building", _reqId: mup._reqId } as any);
log.info("ws mcp_update", { presence_id: presenceId, name: mup.server_name });
break;
}
case "mcp_logs": {
const ml = msg as any;
const lines = serviceManager.getLogs(conn.meshId, ml.server_name, ml.lines);
sendToPeer(presenceId, { type: "mcp_logs_result", server_name: ml.server_name, lines, _reqId: ml._reqId } as any);
break;
}
case "mcp_scope": {
const ms = msg as any;
try {
if (ms.scope !== undefined) {
await updateServiceScope(conn.meshId, ms.server_name, ms.scope);
broadcastToMesh(conn.meshId, {
type: "push", subtype: "system" as const, event: "mcp_scope_changed",
eventData: { name: ms.server_name, scope: ms.scope, by: conn.displayName },
messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system",
priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(),
});
}
const svc = await getService(conn.meshId, ms.server_name);
sendToPeer(presenceId, { type: "mcp_scope_result", server_name: ms.server_name, scope: svc?.scope ?? { type: "peer" }, deployed_by: svc?.deployedByName ?? "unknown", _reqId: ms._reqId } as any);
} catch (e) { sendError(ws, "scope_error", e instanceof Error ? e.message : String(e), undefined, ms._reqId); }
break;
}
case "mcp_schema": {
const msch = msg as any;
try {
let tools = serviceManager.getTools(conn.meshId, msch.server_name);
if (tools.length === 0) {
const svc = await getService(conn.meshId, msch.server_name);
tools = (svc?.toolsSchema as any[]) ?? [];
}
if (msch.tool_name) tools = tools.filter((t: any) => t.name === msch.tool_name);
sendToPeer(presenceId, { type: "mcp_schema_result", server_name: msch.server_name, tools, _reqId: msch._reqId } as any);
} catch (e) { sendError(ws, "schema_error", e instanceof Error ? e.message : String(e), undefined, msch._reqId); }
break;
}
case "mcp_catalog": {
try {
const allSvcs = await listDbMeshServices(conn.meshId);
sendToPeer(presenceId, {
type: "mcp_catalog_result",
services: allSvcs.map((s: any) => ({
name: s.name, type: s.type, description: s.description, status: s.status ?? "stopped",
tool_count: Array.isArray(s.toolsSchema) ? s.toolsSchema.length : 0,
deployed_by: s.deployedByName ?? "unknown", scope: s.scope ?? { type: "peer" },
source_type: s.sourceType, runtime: s.runtime, created_at: s.createdAt.toISOString(),
})),
_reqId: (msg as any)._reqId,
} as any);
} catch (e) { sendError(ws, "catalog_error", e instanceof Error ? e.message : String(e), undefined, (msg as any)._reqId); }
break;
}
case "skill_deploy": {
const sd = msg as any;
sendToPeer(presenceId, { type: "skill_deploy_ack", name: "TODO", files: [], _reqId: sd._reqId } as any);
log.info("ws skill_deploy", { presence_id: presenceId, source: sd.source?.type });
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
@@ -3372,6 +3552,7 @@ function main(): void {
startSweepers();
startDbHealth();
serviceManager.startHealthChecks();
// Ensure audit log table exists and load hash chain state
ensureAuditLogTable()
@@ -3418,6 +3599,7 @@ function main(): void {
clearInterval(rlSweep);
clearInterval(queueDepthTimer);
stopDbHealth();
await serviceManager.shutdownAll();
await stopSweepers();
for (const { ws } of connections.values()) {
try {

View File

@@ -0,0 +1,788 @@
/**
* Service Manager — lifecycle management for mesh-deployed MCP servers.
*
* Each deployed MCP server runs as a child process with its own stdio pipe.
* The manager spawns, monitors, restarts, and routes tool calls to them.
*
* In production: child processes run inside a Docker container (one per mesh).
* In dev: child processes run directly on the broker host.
*/
import { spawn, type ChildProcess } from "node:child_process";
import { createInterface } from "node:readline";
import { existsSync } from "node:fs";
import { readFileSync } from "node:fs";
import { join } from "node:path";
import { log } from "./logger";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** MCP tool definition returned by tools/list. */
export interface ToolDef {
name: string;
description: string;
inputSchema: Record<string, unknown>;
}
/** Per-service deploy-time configuration. */
export interface ServiceConfig {
env?: Record<string, string>;
memory_mb?: number;
cpus?: number;
network_allow?: string[];
runtime?: "node" | "python" | "bun";
}
/** Observable lifecycle states. */
export type ServiceStatus =
| "building"
| "installing"
| "running"
| "stopped"
| "failed"
| "crashed"
| "restarting";
/** Internal bookkeeping for a spawned service. */
interface ManagedService {
name: string;
meshId: string;
process: ChildProcess | null;
tools: ToolDef[];
status: ServiceStatus;
config: ServiceConfig;
sourcePath: string;
runtime: "node" | "python" | "bun";
restartCount: number;
maxRestarts: number;
healthFailures: number;
logBuffer: string[]; // ring buffer, max LOG_BUFFER_SIZE
pendingCalls: Map<
string,
{
resolve: (result: { result?: unknown; error?: string }) => void;
timer: NodeJS.Timeout;
}
>;
pid?: number;
startedAt?: Date;
}
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const LOG_BUFFER_SIZE = 1000;
const HEALTH_INTERVAL_MS = 30_000;
const HEALTH_TIMEOUT_MS = 5_000;
const MAX_HEALTH_FAILURES = 3;
const DEFAULT_MAX_RESTARTS = 5;
const CALL_TIMEOUT_MS = 25_000;
const SERVICES_BASE_DIR =
process.env.CLAUDEMESH_SERVICES_DIR ?? "/var/claudemesh/services";
// ---------------------------------------------------------------------------
// Service registry
// ---------------------------------------------------------------------------
const services = new Map<string, ManagedService>(); // keyed by "meshId:serviceName"
let healthTimer: NodeJS.Timer | null = null;
function serviceKey(meshId: string, name: string): string {
return `${meshId}:${name}`;
}
/** Validate service name: alphanumeric, hyphens, underscores only. No path traversal. */
const SAFE_NAME_RE = /^[a-zA-Z0-9][a-zA-Z0-9_-]{0,63}$/;
export function validateServiceName(name: string): string | null {
if (!SAFE_NAME_RE.test(name)) {
return "service name must be 1-64 chars, alphanumeric/hyphens/underscores, starting with alphanumeric";
}
if (name.includes("..") || name.includes("/") || name.includes("\\")) {
return "service name must not contain path separators";
}
return null; // valid
}
// ---------------------------------------------------------------------------
// Runtime detection
// ---------------------------------------------------------------------------
/**
* Detect the runtime for a service based on its source directory contents.
*
* Priority: bun (lockfile/config) > node (package.json) > python
* (pyproject.toml / requirements.txt). Falls back to node.
*/
export function detectRuntime(sourcePath: string): "node" | "python" | "bun" {
if (
existsSync(join(sourcePath, "bun.lockb")) ||
existsSync(join(sourcePath, "bunfig.toml"))
) {
return "bun";
}
if (existsSync(join(sourcePath, "package.json"))) {
return "node";
}
if (
existsSync(join(sourcePath, "pyproject.toml")) ||
existsSync(join(sourcePath, "requirements.txt"))
) {
return "python";
}
return "node"; // default
}
// ---------------------------------------------------------------------------
// Entry point detection
// ---------------------------------------------------------------------------
function detectEntry(
sourcePath: string,
runtime: "node" | "python" | "bun",
): { command: string; args: string[] } {
if (runtime === "python") {
if (existsSync(join(sourcePath, "requirements.txt"))) {
for (const entry of [
"server.py",
"src/server.py",
"main.py",
"src/main.py",
]) {
if (existsSync(join(sourcePath, entry))) {
return { command: "python", args: [entry] };
}
}
}
if (existsSync(join(sourcePath, "pyproject.toml"))) {
return { command: "python", args: ["-m", "server"] };
}
return { command: "python", args: ["server.py"] };
}
// Node / Bun
const cmd = runtime === "bun" ? "bun" : "node";
if (existsSync(join(sourcePath, "package.json"))) {
try {
const pkg = JSON.parse(
readFileSync(join(sourcePath, "package.json"), "utf-8"),
);
if (pkg.main) return { command: cmd, args: [pkg.main] };
if (pkg.bin) {
const bin =
typeof pkg.bin === "string"
? pkg.bin
: (Object.values(pkg.bin)[0] as string);
if (bin) return { command: cmd, args: [bin] };
}
} catch {
/* ignore parse errors */
}
}
// Common entry points
for (const entry of [
"dist/index.js",
"src/index.js",
"src/index.ts",
"index.js",
]) {
if (existsSync(join(sourcePath, entry))) {
return { command: cmd, args: [entry] };
}
}
return { command: cmd, args: ["src/index.js"] };
}
// ---------------------------------------------------------------------------
// Install dependencies
// ---------------------------------------------------------------------------
/**
* Install dependencies for a service. Resolves on success, rejects with
* the tail of stderr on failure.
*/
export async function installDeps(
sourcePath: string,
runtime: "node" | "python" | "bun",
): Promise<void> {
return new Promise((resolve, reject) => {
let cmd: string;
let args: string[];
if (runtime === "python") {
if (existsSync(join(sourcePath, "requirements.txt"))) {
cmd = "pip";
args = ["install", "--no-cache-dir", "-r", "requirements.txt"];
} else {
cmd = "pip";
args = ["install", "--no-cache-dir", "."];
}
} else if (runtime === "bun") {
cmd = "bun";
args = ["install"];
} else {
cmd = "npm";
args = ["install", "--production", "--legacy-peer-deps"];
}
const child = spawn(cmd, args, {
cwd: sourcePath,
stdio: ["ignore", "pipe", "pipe"],
});
let stderr = "";
child.stderr?.on("data", (d: Buffer) => {
stderr += d.toString();
});
child.on("exit", (code) => {
if (code === 0) resolve();
else
reject(
new Error(
`${cmd} install failed (exit ${code}): ${stderr.slice(-500)}`,
),
);
});
child.on("error", reject);
});
}
// ---------------------------------------------------------------------------
// Log ring buffer
// ---------------------------------------------------------------------------
function appendLog(svc: ManagedService, line: string): void {
svc.logBuffer.push(`${new Date().toISOString()} ${line}`);
if (svc.logBuffer.length > LOG_BUFFER_SIZE) {
svc.logBuffer.shift();
}
}
// ---------------------------------------------------------------------------
// MCP JSON-RPC helpers
// ---------------------------------------------------------------------------
let callIdCounter = 0;
function sendMcpRequest(
svc: ManagedService,
method: string,
params?: unknown,
): Promise<{ result?: unknown; error?: string }> {
return new Promise((resolve) => {
if (!svc.process || !svc.process.stdin?.writable) {
resolve({ error: "service not running" });
return;
}
const id = `call_${++callIdCounter}`;
const request = {
jsonrpc: "2.0",
id,
method,
...(params ? { params } : {}),
};
const timer = setTimeout(() => {
svc.pendingCalls.delete(id);
resolve({ error: `tool call timed out after ${CALL_TIMEOUT_MS}ms` });
}, CALL_TIMEOUT_MS);
svc.pendingCalls.set(id, { resolve, timer });
try {
svc.process.stdin!.write(JSON.stringify(request) + "\n");
} catch (e) {
clearTimeout(timer);
svc.pendingCalls.delete(id);
resolve({
error: `write failed: ${e instanceof Error ? e.message : String(e)}`,
});
}
});
}
// ---------------------------------------------------------------------------
// Initialize MCP server (handshake + tool discovery)
// ---------------------------------------------------------------------------
async function initializeMcp(svc: ManagedService): Promise<ToolDef[]> {
// MCP initialize handshake
const initResult = await sendMcpRequest(svc, "initialize", {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "claudemesh-runner", version: "0.1.0" },
});
if (initResult.error) {
throw new Error(`MCP initialize failed: ${initResult.error}`);
}
// Send initialized notification (no response expected)
if (svc.process?.stdin?.writable) {
svc.process.stdin.write(
JSON.stringify({
jsonrpc: "2.0",
method: "notifications/initialized",
}) + "\n",
);
}
// Fetch tool list
const toolsResult = await sendMcpRequest(svc, "tools/list", {});
if (toolsResult.error) {
throw new Error(`tools/list failed: ${toolsResult.error}`);
}
const result = toolsResult.result as { tools?: ToolDef[] } | undefined;
return result?.tools ?? [];
}
// ---------------------------------------------------------------------------
// Spawn an MCP server child process
// ---------------------------------------------------------------------------
function spawnService(svc: ManagedService): void {
const { command, args } = detectEntry(svc.sourcePath, svc.runtime);
const env: Record<string, string> = {
...(process.env as Record<string, string>),
...(svc.config.env ?? {}),
NODE_ENV: "production",
};
const child = spawn(command, args, {
cwd: svc.sourcePath,
stdio: ["pipe", "pipe", "pipe"],
env,
});
svc.process = child;
svc.pid = child.pid;
svc.startedAt = new Date();
svc.status = "running";
svc.healthFailures = 0;
// Read MCP JSON-RPC responses from stdout
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line) => {
try {
const msg = JSON.parse(line);
if (msg.id && svc.pendingCalls.has(String(msg.id))) {
const pending = svc.pendingCalls.get(String(msg.id))!;
clearTimeout(pending.timer);
svc.pendingCalls.delete(String(msg.id));
if (msg.error) {
pending.resolve({
error: msg.error.message ?? JSON.stringify(msg.error),
});
} else {
pending.resolve({ result: msg.result });
}
}
} catch {
// Not JSON — treat as log output
appendLog(svc, `[stdout] ${line}`);
}
});
// Capture stderr as logs
const stderrRl = createInterface({ input: child.stderr! });
stderrRl.on("line", (line) => {
appendLog(svc, `[stderr] ${line}`);
});
child.on("exit", (code, signal) => {
log.warn("service exited", {
service: svc.name,
mesh_id: svc.meshId,
code,
signal,
restarts: svc.restartCount,
});
// Reject all pending calls
for (const [, pending] of svc.pendingCalls) {
clearTimeout(pending.timer);
pending.resolve({ error: "service crashed" });
}
svc.pendingCalls.clear();
svc.process = null;
svc.pid = undefined;
// Auto-restart if under limit
if (svc.status === "running" && svc.restartCount < svc.maxRestarts) {
svc.restartCount++;
svc.status = "restarting";
log.info("auto-restarting service", {
service: svc.name,
attempt: svc.restartCount,
});
setTimeout(() => spawnService(svc), 1000 * svc.restartCount); // backoff
} else if (svc.status === "running") {
svc.status = "crashed";
log.error("service max restarts exceeded", {
service: svc.name,
restarts: svc.restartCount,
});
}
});
child.on("error", (err) => {
log.error("service spawn error", {
service: svc.name,
error: err.message,
});
svc.status = "failed";
});
log.info("service spawned", {
service: svc.name,
mesh_id: svc.meshId,
pid: child.pid,
command,
args,
runtime: svc.runtime,
});
}
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Deploy (or redeploy) an MCP server.
*
* Installs dependencies, spawns the child process, runs the MCP
* initialize handshake, and returns the discovered tool list.
*/
export async function deploy(opts: {
meshId: string;
name: string;
sourcePath: string;
config: ServiceConfig;
resolvedEnv?: Record<string, string>;
}): Promise<{ tools: ToolDef[]; status: ServiceStatus }> {
const key = serviceKey(opts.meshId, opts.name);
// Kill existing if redeploying
const existing = services.get(key);
if (existing?.process) {
existing.process.kill("SIGTERM");
await new Promise((r) => setTimeout(r, 1000));
}
const runtime = opts.config.runtime ?? detectRuntime(opts.sourcePath);
const svc: ManagedService = {
name: opts.name,
meshId: opts.meshId,
process: null,
tools: [],
status: "installing",
config: {
...opts.config,
env: { ...(opts.config.env ?? {}), ...(opts.resolvedEnv ?? {}) },
},
sourcePath: opts.sourcePath,
runtime,
restartCount: 0,
maxRestarts: DEFAULT_MAX_RESTARTS,
healthFailures: 0,
logBuffer: [],
pendingCalls: new Map(),
};
services.set(key, svc);
// Install dependencies
try {
await installDeps(opts.sourcePath, runtime);
} catch (e) {
svc.status = "failed";
appendLog(
svc,
`Install failed: ${e instanceof Error ? e.message : String(e)}`,
);
throw e;
}
// Spawn and initialize
spawnService(svc);
// Wait a moment for the process to start
await new Promise((r) => setTimeout(r, 500));
// Get tool list via MCP initialize handshake
try {
svc.tools = await initializeMcp(svc);
log.info("service deployed", {
service: opts.name,
mesh_id: opts.meshId,
tools: svc.tools.length,
runtime,
});
} catch (e) {
svc.status = "failed";
appendLog(
svc,
`MCP init failed: ${e instanceof Error ? e.message : String(e)}`,
);
throw e;
}
return { tools: svc.tools, status: svc.status };
}
/**
* Undeploy a running service. Sends SIGTERM, waits for graceful exit
* (up to 10 s), then SIGKILL. All pending tool calls are rejected.
*/
export async function undeploy(meshId: string, name: string): Promise<void> {
const key = serviceKey(meshId, name);
const svc = services.get(key);
if (!svc) return;
svc.status = "stopped";
if (svc.process) {
svc.process.kill("SIGTERM");
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
svc.process?.kill("SIGKILL");
resolve();
}, 10_000);
svc.process?.on("exit", () => {
clearTimeout(timeout);
resolve();
});
});
}
// Reject pending calls
for (const [, pending] of svc.pendingCalls) {
clearTimeout(pending.timer);
pending.resolve({ error: "service undeployed" });
}
services.delete(key);
log.info("service undeployed", { service: name, mesh_id: meshId });
}
/**
* Route a tool call to the named service. Returns the MCP response
* payload or an error string.
*/
export async function callTool(
meshId: string,
serverName: string,
toolName: string,
args: Record<string, unknown>,
): Promise<{ result?: unknown; error?: string }> {
const key = serviceKey(meshId, serverName);
const svc = services.get(key);
if (!svc) return { error: `service "${serverName}" not found` };
if (svc.status !== "running")
return { error: `service "${serverName}" is ${svc.status}` };
if (!svc.process)
return { error: `service "${serverName}" has no running process` };
return sendMcpRequest(svc, "tools/call", { name: toolName, arguments: args });
}
/**
* Return the last N log lines for a service (from its ring buffer).
*/
export function getLogs(meshId: string, name: string, lines = 50): string[] {
const key = serviceKey(meshId, name);
const svc = services.get(key);
if (!svc) return [];
return svc.logBuffer.slice(-Math.min(lines, LOG_BUFFER_SIZE));
}
/**
* Return current status, PID, restart count, tool list, and uptime
* for a single service. Returns null if the service doesn't exist.
*/
export function getStatus(
meshId: string,
name: string,
): {
status: ServiceStatus;
pid?: number;
restartCount: number;
tools: ToolDef[];
startedAt?: string;
} | null {
const key = serviceKey(meshId, name);
const svc = services.get(key);
if (!svc) return null;
return {
status: svc.status,
pid: svc.pid,
restartCount: svc.restartCount,
tools: svc.tools,
startedAt: svc.startedAt?.toISOString(),
};
}
/**
* Return the tool definitions for a service, or an empty array if the
* service doesn't exist.
*/
export function getTools(meshId: string, name: string): ToolDef[] {
const key = serviceKey(meshId, name);
const svc = services.get(key);
return svc?.tools ?? [];
}
/**
* List all services belonging to a mesh with summary info.
*/
export function listServices(
meshId: string,
): Array<{
name: string;
status: ServiceStatus;
toolCount: number;
runtime: string;
restartCount: number;
pid?: number;
}> {
const result: Array<{
name: string;
status: ServiceStatus;
toolCount: number;
runtime: string;
restartCount: number;
pid?: number;
}> = [];
for (const [key, svc] of services) {
if (!key.startsWith(`${meshId}:`)) continue;
result.push({
name: svc.name,
status: svc.status,
toolCount: svc.tools.length,
runtime: svc.runtime,
restartCount: svc.restartCount,
pid: svc.pid,
});
}
return result;
}
// ---------------------------------------------------------------------------
// Health check loop
// ---------------------------------------------------------------------------
async function healthCheckAll(): Promise<void> {
for (const [, svc] of services) {
if (svc.status !== "running" || !svc.process) continue;
const result = await sendMcpRequest(svc, "ping", {});
if (result.error) {
svc.healthFailures++;
log.warn("health check failed", {
service: svc.name,
failures: svc.healthFailures,
error: result.error,
});
if (svc.healthFailures >= MAX_HEALTH_FAILURES) {
log.error("health check threshold exceeded, restarting", {
service: svc.name,
});
svc.process.kill("SIGTERM");
// exit handler will trigger auto-restart
}
} else {
svc.healthFailures = 0;
}
}
}
/** Start the periodic health check loop (30 s interval). No-op if already running. */
export function startHealthChecks(): void {
if (healthTimer) return;
healthTimer = setInterval(healthCheckAll, HEALTH_INTERVAL_MS);
}
/** Stop the periodic health check loop. */
export function stopHealthChecks(): void {
if (healthTimer) {
clearInterval(healthTimer);
healthTimer = null;
}
}
// ---------------------------------------------------------------------------
// Restore all services on broker boot
// ---------------------------------------------------------------------------
/**
* Re-deploy every persisted service record. Called once at broker startup
* to bring services back after a restart. Failures are logged but don't
* prevent other services from restoring.
*/
export async function restoreAll(
getServiceRecords: () => Promise<
Array<{
meshId: string;
name: string;
sourcePath: string;
config: ServiceConfig;
resolvedEnv?: Record<string, string>;
}>
>,
): Promise<void> {
const records = await getServiceRecords();
log.info("restoring services", { count: records.length });
for (const record of records) {
try {
await deploy({
meshId: record.meshId,
name: record.name,
sourcePath: record.sourcePath,
config: record.config,
resolvedEnv: record.resolvedEnv,
});
log.info("service restored", {
service: record.name,
mesh_id: record.meshId,
});
} catch (e) {
log.error("service restore failed", {
service: record.name,
mesh_id: record.meshId,
error: e instanceof Error ? e.message : String(e),
});
}
}
startHealthChecks();
}
// ---------------------------------------------------------------------------
// Shutdown
// ---------------------------------------------------------------------------
/**
* Gracefully shut down all running services. Stops health checks, sends
* SIGTERM to every child, waits for exit, then clears the registry.
*/
export async function shutdownAll(): Promise<void> {
stopHealthChecks();
const promises: Promise<void>[] = [];
for (const [, svc] of services) {
if (svc.process) {
svc.status = "stopped";
promises.push(undeploy(svc.meshId, svc.name));
}
}
await Promise.allSettled(promises);
services.clear();
}

View File

@@ -224,6 +224,7 @@ export interface WSHelloAckMessage {
restoredGroups?: Array<{ name: string; role?: string }>;
/** Restored cumulative stats (only when restored). */
restoredStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number };
services?: Array<{ name: string; description: string; status: string; tools: Array<{ name: string; description: string; inputSchema: object }>; deployed_by: string }>;
}
/** Broker → client: list of connected peers in the same mesh. */
@@ -1078,6 +1079,29 @@ export interface WSCancelScheduledAckMessage {
_reqId?: string;
}
/** Client → broker: deploy an MCP server from zip or git. */
export interface WSMcpDeployMessage { type: "mcp_deploy"; server_name: string; source: { type: "zip"; file_id: string } | { type: "git"; url: string; branch?: string; auth?: string }; config?: { env?: Record<string, string>; memory_mb?: number; cpus?: number; network_allow?: string[]; runtime?: "node" | "python" | "bun" }; scope?: "peer" | "mesh" | { peers: string[] } | { group: string } | { groups: string[] } | { role: string }; _reqId?: string; }
/** Client → broker: stop and remove a managed MCP server. */
export interface WSMcpUndeployMessage { type: "mcp_undeploy"; server_name: string; _reqId?: string; }
/** Client → broker: pull + rebuild + restart a git-sourced MCP. */
export interface WSMcpUpdateMessage { type: "mcp_update"; server_name: string; _reqId?: string; }
/** Client → broker: get logs from a managed MCP. */
export interface WSMcpLogsMessage { type: "mcp_logs"; server_name: string; lines?: number; _reqId?: string; }
/** Client → broker: get or set visibility scope. */
export interface WSMcpScopeMessage { type: "mcp_scope"; server_name: string; scope?: "peer" | "mesh" | { peers: string[] } | { group: string } | { groups: string[] } | { role: string }; _reqId?: string; }
/** Client → broker: inspect tool schemas for a deployed service. */
export interface WSMcpSchemaMessage { type: "mcp_schema"; server_name: string; tool_name?: string; _reqId?: string; }
/** Client → broker: list all deployed services. */
export interface WSMcpCatalogMessage { type: "mcp_catalog"; _reqId?: string; }
/** Client → broker: deploy a skill bundle from zip or git. */
export interface WSSkillDeployMessage { type: "skill_deploy"; source: { type: "zip"; file_id: string } | { type: "git"; url: string; branch?: string; auth?: string }; _reqId?: string; }
/** Client → broker: store encrypted credential. */
export interface WSVaultSetMessage { type: "vault_set"; key: string; ciphertext: string; nonce: string; sealed_key: string; entry_type: "env" | "file"; mount_path?: string; description?: string; _reqId?: string; }
/** Client → broker: list vault entries. */
export interface WSVaultListMessage { type: "vault_list"; _reqId?: string; }
/** Client → broker: delete vault entry. */
export interface WSVaultDeleteMessage { type: "vault_delete"; key: string; _reqId?: string; }
export type WSClientMessage =
| WSHelloMessage
| WSSendMessage
@@ -1147,7 +1171,18 @@ export type WSClientMessage =
| WSPeerDirRequestMessage
| WSPeerDirResponseMessage
| WSAuditQueryMessage
| WSAuditVerifyMessage;
| WSAuditVerifyMessage
| WSMcpDeployMessage
| WSMcpUndeployMessage
| WSMcpUpdateMessage
| WSMcpLogsMessage
| WSMcpScopeMessage
| WSMcpSchemaMessage
| WSMcpCatalogMessage
| WSSkillDeployMessage
| WSVaultSetMessage
| WSVaultListMessage
| WSVaultDeleteMessage;
// --- Skill messages ---
@@ -1217,6 +1252,23 @@ export interface WSSkillListMessage {
_reqId?: string;
}
/** Broker → client: deployment progress/result. */
export interface WSMcpDeployStatusMessage { type: "mcp_deploy_status"; server_name: string; status: "building" | "installing" | "running" | "failed"; tools?: Array<{ name: string; description: string; inputSchema: object }>; error?: string; _reqId?: string; }
/** Broker → client: service log output. */
export interface WSMcpLogsResultMessage { type: "mcp_logs_result"; server_name: string; lines: string[]; _reqId?: string; }
/** Broker → client: tool schema introspection result. */
export interface WSMcpSchemaResultMessage { type: "mcp_schema_result"; server_name: string; tools: Array<{ name: string; description: string; inputSchema: object }>; _reqId?: string; }
/** Broker → client: full service catalog. */
export interface WSMcpCatalogResultMessage { type: "mcp_catalog_result"; services: Array<{ name: string; type: "mcp" | "skill"; description: string; status: string; tool_count: number; deployed_by: string; scope: { type: string; [key: string]: unknown }; source_type: string; runtime?: string; created_at: string }>; _reqId?: string; }
/** Broker → client: scope query/set result. */
export interface WSMcpScopeResultMessage { type: "mcp_scope_result"; server_name: string; scope: { type: string; [key: string]: unknown }; deployed_by: string; _reqId?: string; }
/** Broker → client: skill deploy acknowledgement. */
export interface WSSkillDeployAckMessage { type: "skill_deploy_ack"; name: string; files: string[]; _reqId?: string; }
/** Broker → client: vault operation acknowledgement. */
export interface WSVaultAckMessage { type: "vault_ack"; key: string; action: "stored" | "deleted" | "not_found"; _reqId?: string; }
/** Broker → client: vault entry listing. */
export interface WSVaultListResultMessage { type: "vault_list_result"; entries: Array<{ key: string; entry_type: "env" | "file"; mount_path?: string; description?: string; updated_at: string }>; _reqId?: string; }
export type WSServerMessage =
| WSHelloAckMessage
| WSPushMessage
@@ -1267,4 +1319,12 @@ export type WSServerMessage =
| WSPeerDirResponseForwardMessage
| WSAuditResultMessage
| WSAuditVerifyResultMessage
| WSMcpDeployStatusMessage
| WSMcpLogsResultMessage
| WSMcpSchemaResultMessage
| WSMcpCatalogResultMessage
| WSMcpScopeResultMessage
| WSSkillDeployAckMessage
| WSVaultAckMessage
| WSVaultListResultMessage
| WSErrorMessage;