/** * claudemesh runner supervisor — manages MCP server child processes. * * HTTP API (called by broker): * POST /load { name, sourcePath, env, runtime } → spawn MCP, return tools * POST /call { name, tool, args } → route tool call * POST /unload { name } → kill process * GET /health → { ok, services } * GET /list { name? } → tools for a service * * Each MCP server is a child process with its own stdio pipe. * The supervisor talks MCP JSON-RPC over stdin/stdout to each child. */ import { createServer } from "node:http"; import { spawn } from "node:child_process"; import { createInterface } from "node:readline"; import { existsSync, readFileSync, mkdirSync, writeFileSync, readdirSync } from "node:fs"; import { join } from "node:path"; const PORT = parseInt(process.env.RUNNER_PORT || "7901", 10); const CALL_TIMEOUT_MS = 25_000; const LOG_BUFFER_SIZE = 500; // --- Service registry --- const services = new Map(); let callIdCounter = 0; // --- Runtime detection --- function detectRuntime(sourcePath) { if (existsSync(join(sourcePath, "bun.lockb")) || existsSync(join(sourcePath, "bunfig.toml"))) return "bun"; if (existsSync(join(sourcePath, "package.json"))) return "node"; if (existsSync(join(sourcePath, "pyproject.toml")) || existsSync(join(sourcePath, "requirements.txt"))) return "python"; return "node"; } function detectEntry(sourcePath, runtime) { if (runtime === "python") { for (const e of ["server.py", "src/server.py", "main.py", "src/main.py"]) { if (existsSync(join(sourcePath, e))) return { cmd: "python3", args: [e] }; } if (existsSync(join(sourcePath, "pyproject.toml"))) return { cmd: "python3", args: ["-m", "server"] }; return { cmd: "python3", args: ["server.py"] }; } const cmd = runtime === "bun" ? "bun" : "node"; if (existsSync(join(sourcePath, "package.json"))) { try { const pkg = JSON.parse(readFileSync(join(sourcePath, "package.json"), "utf-8")); if (pkg.main) return { cmd, args: [pkg.main] }; if (pkg.bin) { const bin = typeof pkg.bin === "string" ? pkg.bin : Object.values(pkg.bin)[0]; if (bin) return { cmd, args: [bin] }; } } catch {} } for (const e of ["dist/index.js", "src/index.js", "src/index.ts", "index.js"]) { if (existsSync(join(sourcePath, e))) return { cmd, args: [e] }; } return { cmd, args: ["src/index.js"] }; } // --- Install deps --- function installDeps(sourcePath, runtime) { return new Promise((resolve, reject) => { let cmd, args; if (runtime === "python") { if (existsSync(join(sourcePath, "requirements.txt"))) { cmd = "pip3"; args = ["install", "--no-cache-dir", "-r", "requirements.txt"]; } else { cmd = "pip3"; args = ["install", "--no-cache-dir", "."]; } } else if (runtime === "bun") { cmd = "bun"; args = ["install"]; } else { cmd = "npm"; args = ["install", "--production", "--legacy-peer-deps"]; } const child = spawn(cmd, args, { cwd: sourcePath, stdio: ["ignore", "pipe", "pipe"] }); let stderr = ""; child.stderr?.on("data", d => { stderr += d.toString(); }); child.on("exit", code => code === 0 ? resolve() : reject(new Error(`${cmd} install exit ${code}: ${stderr.slice(-300)}`))); child.on("error", reject); }); } // --- MCP JSON-RPC --- function sendMcpRequest(svc, method, params) { return new Promise(resolve => { if (!svc.process?.stdin?.writable) { resolve({ error: "not running" }); return; } const id = `c_${++callIdCounter}`; const timer = setTimeout(() => { svc.pending.delete(id); resolve({ error: "timeout" }); }, CALL_TIMEOUT_MS); svc.pending.set(id, { resolve, timer }); try { svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", id, method, ...(params ? { params } : {}) }) + "\n"); } catch (e) { clearTimeout(timer); svc.pending.delete(id); resolve({ error: e.message }); } }); } async function initMcp(svc) { const init = await sendMcpRequest(svc, "initialize", { protocolVersion: "2024-11-05", capabilities: {}, clientInfo: { name: "claudemesh-runner", version: "0.1.0" }, }); if (init.error) throw new Error(`init failed: ${init.error}`); if (svc.process?.stdin?.writable) { svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", method: "notifications/initialized" }) + "\n"); } const tools = await sendMcpRequest(svc, "tools/list", {}); if (tools.error) throw new Error(`tools/list failed: ${tools.error}`); return tools.result?.tools ?? []; } // --- Spawn --- function spawnService(svc) { // npx/uvx packages have pre-resolved entry points let cmd, args; if (svc._pythonModule) { // Python MCPs: run via venv python -m cmd = svc._venvPython; args = ["-m", svc._pythonModule]; } else if (svc._npxBin) { cmd = "node"; args = [svc._npxBin]; } else { ({ cmd, args } = detectEntry(svc.sourcePath, svc.runtime)); } const child = spawn(cmd, args, { cwd: svc.sourcePath, stdio: ["pipe", "pipe", "pipe"], env: { ...process.env, ...svc.env, NODE_ENV: "production" }, }); svc.process = child; svc.pid = child.pid; svc.status = "running"; svc.healthFailures = 0; const rl = createInterface({ input: child.stdout }); rl.on("line", line => { try { const msg = JSON.parse(line); if (msg.id && svc.pending.has(String(msg.id))) { const p = svc.pending.get(String(msg.id)); clearTimeout(p.timer); svc.pending.delete(String(msg.id)); p.resolve(msg.error ? { error: msg.error.message ?? JSON.stringify(msg.error) } : { result: msg.result }); } } catch { svc.logs.push(`[stdout] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); } }); const errRl = createInterface({ input: child.stderr }); errRl.on("line", line => { svc.logs.push(`[stderr] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); }); child.on("exit", (code, signal) => { console.log(`[runner] ${svc.name} exited code=${code} signal=${signal} restarts=${svc.restarts}`); for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "crashed" }); } svc.pending.clear(); svc.process = null; svc.pid = null; if (svc.status === "running" && svc.restarts < 5) { svc.restarts++; svc.status = "restarting"; setTimeout(() => spawnService(svc), 1000 * svc.restarts); } else if (svc.status === "running") { svc.status = "crashed"; } }); child.on("error", err => { console.error(`[runner] ${svc.name} spawn error: ${err.message}`); svc.status = "failed"; }); console.log(`[runner] spawned ${svc.name} pid=${child.pid} cmd=${cmd} ${args.join(" ")}`); } // --- HTTP API --- function readBody(req) { return new Promise((resolve, reject) => { const chunks = []; req.on("data", c => chunks.push(c)); req.on("end", () => { try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } catch (e) { reject(e); } }); req.on("error", reject); }); } function json(res, status, body) { res.writeHead(status, { "Content-Type": "application/json" }); res.end(JSON.stringify(body)); } const server = createServer(async (req, res) => { try { if (req.method === "GET" && req.url === "/health") { const svcs = []; for (const [name, svc] of services) { svcs.push({ name, status: svc.status, pid: svc.pid, tools: svc.tools.length, restarts: svc.restarts }); } return json(res, 200, { ok: true, services: svcs }); } if (req.method === "GET" && req.url?.startsWith("/list")) { const url = new URL(req.url, "http://localhost"); const name = url.searchParams.get("name"); if (name) { const svc = services.get(name); if (!svc) return json(res, 404, { error: `service "${name}" not found` }); return json(res, 200, { tools: svc.tools }); } const all = {}; for (const [n, s] of services) all[n] = s.tools; return json(res, 200, all); } if (req.method === "GET" && req.url?.startsWith("/logs")) { const url = new URL(req.url, "http://localhost"); const name = url.searchParams.get("name"); const lines = parseInt(url.searchParams.get("lines") || "50", 10); const svc = services.get(name); if (!svc) return json(res, 404, { error: "not found" }); return json(res, 200, { lines: svc.logs.slice(-lines) }); } if (req.method === "POST" && req.url === "/load") { const body = await readBody(req); const { name, sourcePath, gitUrl, gitBranch, npxPackage, env: svcEnv, runtime: rt } = body; if (!name) return json(res, 400, { error: "name required" }); // Kill existing const existing = services.get(name); if (existing?.process) { existing.status = "stopped"; existing.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 1000)); } // Determine source path — git clone, npx, or pre-existing path let svcSourcePath = sourcePath; let svcRuntime = rt; if (gitUrl) { // Git clone into runner's local storage svcSourcePath = join("/var/claudemesh/services", name); const { execSync } = await import("node:child_process"); mkdirSync(svcSourcePath, { recursive: true }); try { // Clean existing clone execSync(`rm -rf ${svcSourcePath}/*`, { timeout: 10_000 }); execSync(`git clone --depth 1 ${gitBranch ? `--branch ${gitBranch}` : ""} ${gitUrl} .`, { cwd: svcSourcePath, timeout: 120_000, stdio: "pipe", env: { ...process.env, GIT_TERMINAL_PROMPT: "0" } }); console.log(`[runner] git clone complete: ${gitUrl} -> ${svcSourcePath}`); } catch (e) { return json(res, 500, { error: `git clone failed: ${e.message}` }); } } else if (npxPackage) { // npx-based: create a minimal package.json that depends on the package svcSourcePath = join("/var/claudemesh/services", name); mkdirSync(svcSourcePath, { recursive: true }); const pkg = { name: `mcp-${name}`, private: true, dependencies: { [npxPackage]: "*" } }; writeFileSync(join(svcSourcePath, "package.json"), JSON.stringify(pkg, null, 2)); svcRuntime = svcRuntime || "node"; } else if (body.uvxPackage) { // uvx-based Python MCP: install via uv and find the entry point svcSourcePath = join("/var/claudemesh/services", name); mkdirSync(svcSourcePath, { recursive: true }); const { execSync } = await import("node:child_process"); try { execSync(`uv venv --clear ${join(svcSourcePath, ".venv")}`, { timeout: 30_000, stdio: "pipe" }); execSync(`uv pip install --python ${join(svcSourcePath, ".venv/bin/python")} "${body.uvxPackage}" "mcp[cli]"`, { timeout: 120_000, stdio: "pipe" }); console.log(`[runner] uvx package installed: ${body.uvxPackage}`); } catch (e) { return json(res, 500, { error: `uvx install failed: ${e.message}` }); } // For Python MCPs: run via `python -m ` using the venv python. // The module name is derived from the package name: mcp-server-time → mcp_server_time const venvPython = join(svcSourcePath, ".venv/bin/python"); const moduleName = body.uvxPackage.replace(/-/g, "_"); svcRuntime = "python"; // _pythonModule signals spawnService to use `python -m ` instead of binary const svc2 = { name, sourcePath: svcSourcePath, runtime: svcRuntime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "running", pending: new Map(), logs: [], restarts: 0, healthFailures: 0, _venvPython: venvPython, _pythonModule: moduleName }; services.set(name, svc2); spawnService(svc2); // Python MCPs take longer to start — retry init with backoff let initErr = null; for (let attempt = 0; attempt < 3; attempt++) { await new Promise(r => setTimeout(r, 1500 + attempt * 1000)); try { svc2.tools = await initMcp(svc2); console.log(`[runner] ${name} ready (uvx), ${svc2.tools.length} tools`); return json(res, 200, { status: "running", tools: svc2.tools }); } catch (e) { initErr = e; } } svc2.status = "failed"; svc2.logs.push(`MCP init failed after 3 attempts: ${initErr?.message}`); return json(res, 500, { error: initErr?.message, logs: svc2.logs.slice(-10) }); } else if (!svcSourcePath) { return json(res, 400, { error: "one of sourcePath, gitUrl, or npxPackage required" }); } const runtime = svcRuntime || detectRuntime(svcSourcePath); const svc = { name, sourcePath: svcSourcePath, runtime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "installing", pending: new Map(), logs: [], restarts: 0, healthFailures: 0 }; services.set(name, svc); // Install deps try { await installDeps(svcSourcePath, runtime); } catch (e) { svc.status = "failed"; svc.logs.push(`install failed: ${e.message}`); return json(res, 500, { error: e.message }); } // For npx packages: find the binary in node_modules/.bin if (npxPackage) { const binDir = join(svcSourcePath, "node_modules", ".bin"); if (existsSync(binDir)) { const bins = readdirSync(binDir).filter(b => !["node-which", "which", "semver", "resolve"].includes(b)); // Prefer binary matching the package name const pkgShort = npxPackage.split("/").pop().replace(/^@/, ""); const match = bins.find(b => b === pkgShort || b.includes(pkgShort)) || bins[0]; if (match) { svc._npxBin = join(binDir, match); console.log(`[runner] npx binary resolved: ${match}`); } } } // Spawn + MCP handshake spawnService(svc); await new Promise(r => setTimeout(r, 1000)); // npx packages may need more startup time try { svc.tools = await initMcp(svc); console.log(`[runner] ${name} ready, ${svc.tools.length} tools`); return json(res, 200, { status: "running", tools: svc.tools }); } catch (e) { svc.status = "failed"; svc.logs.push(`MCP init failed: ${e.message}`); return json(res, 500, { error: e.message, logs: svc.logs.slice(-10) }); } } if (req.method === "POST" && req.url === "/call") { const body = await readBody(req); const { name, tool, args } = body; const svc = services.get(name); if (!svc) return json(res, 404, { error: `service "${name}" not found` }); if (svc.status !== "running") return json(res, 503, { error: `service is ${svc.status}` }); const result = await sendMcpRequest(svc, "tools/call", { name: tool, arguments: args || {} }); return json(res, 200, result); } if (req.method === "POST" && req.url === "/unload") { const body = await readBody(req); const { name } = body; const svc = services.get(name); if (!svc) return json(res, 404, { error: "not found" }); svc.status = "stopped"; if (svc.process) { svc.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 2000)); if (svc.process) svc.process.kill("SIGKILL"); } for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "unloaded" }); } services.delete(name); return json(res, 200, { ok: true }); } json(res, 404, { error: "not found" }); } catch (e) { console.error("[runner] request error:", e); json(res, 500, { error: e.message }); } }); server.listen(PORT, "0.0.0.0", () => { console.log(`[runner] supervisor listening on :${PORT}`); }); process.on("SIGTERM", () => { console.log("[runner] shutting down..."); for (const [, svc] of services) { svc.status = "stopped"; svc.process?.kill("SIGTERM"); } server.close(() => process.exit(0)); });