From 873f588057e2c8c824d36d554860f349aa46f35f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:06:43 +0100 Subject: [PATCH] feat: runner container + broker deploy pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - apps/runner/: Dockerfile (node22 + python3 + uv + bun) + supervisor.mjs (HTTP API for load/call/unload/health) - docker-compose: runner service with shared services-data volume - Broker mcp_deploy: git clone or zip extract → runner /load → MCP spawn - Broker mcp_call: routes managed services to runner via HTTP, falls back to live-proxy for peer-hosted servers - RUNNER_URL env var for broker → runner communication Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/src/env.ts | 1 + apps/broker/src/index.ts | 115 +++++++++++++- apps/runner/Dockerfile | 40 +++++ apps/runner/supervisor.mjs | 278 ++++++++++++++++++++++++++++++++++ docker-compose.production.yml | 27 ++++ 5 files changed, 454 insertions(+), 7 deletions(-) create mode 100644 apps/runner/Dockerfile create mode 100644 apps/runner/supervisor.mjs diff --git a/apps/broker/src/env.ts b/apps/broker/src/env.ts index 1b98c8f..ed8666e 100644 --- a/apps/broker/src/env.ts +++ b/apps/broker/src/env.ts @@ -28,6 +28,7 @@ const envSchema = z.object({ NEO4J_URL: z.string().default("bolt://neo4j:7687"), NEO4J_USER: z.string().default("neo4j"), NEO4J_PASSWORD: z.string().default("changeme"), + RUNNER_URL: z.string().default("http://runner:7901"), CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"), BROKER_ENCRYPTION_KEY: z.string().default(""), // 64 hex chars (32 bytes). Auto-generated if empty. MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20), diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 8538ec2..bbf3f33 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -2806,6 +2806,34 @@ function handleConnection(ws: WebSocket): void { case "mcp_call": { const mc = msg as Extract; const callKey = `${conn.meshId}:${mc.serverName}`; + + // Check managed services first (runner-hosted) + const managedSvc = await getService(conn.meshId, mc.serverName); + if (managedSvc && managedSvc.status === "running") { + try { + const runnerRes = await fetch(`${env.RUNNER_URL}/call`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ name: mc.serverName, tool: mc.toolName, args: mc.args ?? {} }), + }); + const result = await runnerRes.json() as { result?: unknown; error?: string }; + sendToPeer(presenceId, { + type: "mcp_call_result", + ...(result.result !== undefined ? { result: result.result } : {}), + ...(result.error ? { error: result.error } : {}), + ...(_reqId ? { _reqId } : {}), + } as any); + } catch (e) { + sendToPeer(presenceId, { + type: "mcp_call_result", + error: `runner call failed: ${e instanceof Error ? e.message : String(e)}`, + ...(_reqId ? { _reqId } : {}), + } as any); + } + break; + } + + // Fall back to live-proxy (peer-hosted) MCP registry const server = mcpRegistry.get(callKey); if (!server) { sendToPeer(presenceId, { @@ -3195,13 +3223,86 @@ function handleConnection(ws: WebSocket): void { scope: md.scope ?? "peer", deployedBy: conn.memberId, deployedByName: conn.displayName, }); sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "building", _reqId: md._reqId } as any); - broadcastToMesh(conn.meshId, { - type: "push", subtype: "system" as const, event: "mcp_deployed", - eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: 0, deployed_by: conn.displayName, scope: md.scope ?? "peer" }, - messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system", - priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(), - }); - log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name }); + log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name, source: md.source.type }); + + // --- Source extraction + runner spawn (async, non-blocking) --- + (async () => { + try { + const { mkdirSync, writeFileSync } = await import("node:fs"); + const { join } = await import("node:path"); + const sourcePath = join(env.CLAUDEMESH_SERVICES_DIR, conn.meshId, md.server_name, "source"); + mkdirSync(sourcePath, { recursive: true }); + + // Extract source + if (md.source.type === "git") { + const { execSync } = await import("node:child_process"); + const gitUrl = md.source.url; + const branch = md.source.branch ?? "main"; + execSync(`git clone --depth 1 --branch ${branch} ${gitUrl} .`, { cwd: sourcePath, timeout: 60_000 }); + log.info("git clone complete", { name: md.server_name, url: gitUrl }); + } else if (md.source.type === "zip" && md.source.file_id) { + // Download from MinIO and extract + const bucket = meshBucketName(conn.meshId); + const fileRow = await getFile(conn.meshId, md.source.file_id); + if (!fileRow) throw new Error(`file ${md.source.file_id} not found`); + const stream = await minioClient.getObject(bucket, (fileRow as any).minioKey); + const chunks: Buffer[] = []; + for await (const chunk of stream) chunks.push(chunk as Buffer); + const zipBuf = Buffer.concat(chunks); + // Write zip and extract + const zipPath = join(sourcePath, ".._upload.zip"); + writeFileSync(zipPath, zipBuf); + const { execSync } = await import("node:child_process"); + execSync(`unzip -o "${zipPath}" -d .`, { cwd: sourcePath, timeout: 30_000 }); + execSync(`rm -f "${zipPath}"`, { cwd: sourcePath }); + log.info("zip extracted", { name: md.server_name, file_id: md.source.file_id }); + } else if (md.source.type === "npx") { + // npx-based: no source extraction needed, runner spawns via npx + // Write a marker file so runner knows the spawn command + writeFileSync(join(sourcePath, ".npx-package"), md.source.package ?? md.server_name); + } + + // Resolve env vars (decrypted by CLI, sent as plaintext over TLS) + const resolvedEnv = md.config?.env ?? {}; + + // Call runner HTTP API to load the service + const runnerRes = await fetch(`${env.RUNNER_URL}/load`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + name: md.server_name, + sourcePath, + env: resolvedEnv, + runtime: md.config?.runtime, + }), + }); + const runnerResult = await runnerRes.json() as { status?: string; tools?: any[]; error?: string }; + + if (!runnerRes.ok || runnerResult.error) { + await updateServiceStatus(conn.meshId, md.server_name, "failed"); + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "failed", error: runnerResult.error, _reqId: md._reqId } as any); + log.error("runner load failed", { name: md.server_name, error: runnerResult.error }); + return; + } + + // Update DB with tools and running status + await updateServiceStatus(conn.meshId, md.server_name, "running", { + toolsSchema: runnerResult.tools, + }); + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "running", tools: runnerResult.tools, _reqId: md._reqId } as any); + broadcastToMesh(conn.meshId, { + type: "push", subtype: "system" as const, event: "mcp_deployed", + eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: runnerResult.tools?.length ?? 0, deployed_by: conn.displayName, scope: md.scope ?? "peer", tools: runnerResult.tools }, + messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system", + priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(), + }); + log.info("service deployed", { name: md.server_name, tools: runnerResult.tools?.length ?? 0 }); + } catch (e) { + await updateServiceStatus(conn.meshId, md.server_name, "failed").catch(() => {}); + sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "failed", error: e instanceof Error ? e.message : String(e), _reqId: md._reqId } as any); + log.error("deploy pipeline failed", { name: md.server_name, error: e instanceof Error ? e.message : String(e) }); + } + })(); } catch (e) { sendError(ws, "deploy_error", e instanceof Error ? e.message : String(e), undefined, md._reqId); } break; } diff --git a/apps/runner/Dockerfile b/apps/runner/Dockerfile new file mode 100644 index 0000000..49d5933 --- /dev/null +++ b/apps/runner/Dockerfile @@ -0,0 +1,40 @@ +# claudemesh runner — executes deployed MCP servers as child processes. +# Multi-runtime: Node 22 + Python 3.12 + uv + Bun +# +# The runner supervisor (Node) listens on HTTP :7901 for commands from +# the broker (load, call, unload, health, list_tools). Each deployed +# MCP server runs as a child process with its own stdio pipe. + +FROM node:22-slim AS base + +# Install Python 3.12 + uv (fast pip replacement) + git +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3 python3-pip python3-venv \ + curl ca-certificates git \ + && curl -LsSf https://astral.sh/uv/install.sh | sh \ + && ln -sf /root/.local/bin/uv /usr/local/bin/uv \ + && ln -sf /root/.local/bin/uvx /usr/local/bin/uvx \ + && rm -rf /var/lib/apt/lists/* + +# Install Bun (for bun-based MCP servers) +RUN curl -fsSL https://bun.sh/install | bash \ + && ln -sf /root/.bun/bin/bun /usr/local/bin/bun \ + && ln -sf /root/.bun/bin/bunx /usr/local/bin/bunx + +WORKDIR /app + +# Copy the runner supervisor +COPY supervisor.mjs /app/supervisor.mjs + +# Services directory (shared volume with broker) +RUN mkdir -p /var/claudemesh/services + +ENV NODE_ENV=production +ENV RUNNER_PORT=7901 + +EXPOSE 7901 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD node -e "fetch('http://localhost:7901/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))" + +CMD ["node", "supervisor.mjs"] diff --git a/apps/runner/supervisor.mjs b/apps/runner/supervisor.mjs new file mode 100644 index 0000000..bab3a34 --- /dev/null +++ b/apps/runner/supervisor.mjs @@ -0,0 +1,278 @@ +/** + * claudemesh runner supervisor — manages MCP server child processes. + * + * HTTP API (called by broker): + * POST /load { name, sourcePath, env, runtime } → spawn MCP, return tools + * POST /call { name, tool, args } → route tool call + * POST /unload { name } → kill process + * GET /health → { ok, services } + * GET /list { name? } → tools for a service + * + * Each MCP server is a child process with its own stdio pipe. + * The supervisor talks MCP JSON-RPC over stdin/stdout to each child. + */ + +import { createServer } from "node:http"; +import { spawn } from "node:child_process"; +import { createInterface } from "node:readline"; +import { existsSync, readFileSync, mkdirSync } from "node:fs"; +import { join } from "node:path"; + +const PORT = parseInt(process.env.RUNNER_PORT || "7901", 10); +const CALL_TIMEOUT_MS = 25_000; +const LOG_BUFFER_SIZE = 500; + +// --- Service registry --- + +const services = new Map(); +let callIdCounter = 0; + +// --- Runtime detection --- + +function detectRuntime(sourcePath) { + if (existsSync(join(sourcePath, "bun.lockb")) || existsSync(join(sourcePath, "bunfig.toml"))) return "bun"; + if (existsSync(join(sourcePath, "package.json"))) return "node"; + if (existsSync(join(sourcePath, "pyproject.toml")) || existsSync(join(sourcePath, "requirements.txt"))) return "python"; + return "node"; +} + +function detectEntry(sourcePath, runtime) { + if (runtime === "python") { + for (const e of ["server.py", "src/server.py", "main.py", "src/main.py"]) { + if (existsSync(join(sourcePath, e))) return { cmd: "python3", args: [e] }; + } + if (existsSync(join(sourcePath, "pyproject.toml"))) return { cmd: "python3", args: ["-m", "server"] }; + return { cmd: "python3", args: ["server.py"] }; + } + const cmd = runtime === "bun" ? "bun" : "node"; + if (existsSync(join(sourcePath, "package.json"))) { + try { + const pkg = JSON.parse(readFileSync(join(sourcePath, "package.json"), "utf-8")); + if (pkg.main) return { cmd, args: [pkg.main] }; + if (pkg.bin) { + const bin = typeof pkg.bin === "string" ? pkg.bin : Object.values(pkg.bin)[0]; + if (bin) return { cmd, args: [bin] }; + } + } catch {} + } + for (const e of ["dist/index.js", "src/index.js", "src/index.ts", "index.js"]) { + if (existsSync(join(sourcePath, e))) return { cmd, args: [e] }; + } + return { cmd, args: ["src/index.js"] }; +} + +// --- Install deps --- + +function installDeps(sourcePath, runtime) { + return new Promise((resolve, reject) => { + let cmd, args; + if (runtime === "python") { + if (existsSync(join(sourcePath, "requirements.txt"))) { + cmd = "pip3"; args = ["install", "--no-cache-dir", "-r", "requirements.txt"]; + } else { cmd = "pip3"; args = ["install", "--no-cache-dir", "."]; } + } else if (runtime === "bun") { + cmd = "bun"; args = ["install"]; + } else { + cmd = "npm"; args = ["install", "--production", "--legacy-peer-deps"]; + } + const child = spawn(cmd, args, { cwd: sourcePath, stdio: ["ignore", "pipe", "pipe"] }); + let stderr = ""; + child.stderr?.on("data", d => { stderr += d.toString(); }); + child.on("exit", code => code === 0 ? resolve() : reject(new Error(`${cmd} install exit ${code}: ${stderr.slice(-300)}`))); + child.on("error", reject); + }); +} + +// --- MCP JSON-RPC --- + +function sendMcpRequest(svc, method, params) { + return new Promise(resolve => { + if (!svc.process?.stdin?.writable) { resolve({ error: "not running" }); return; } + const id = `c_${++callIdCounter}`; + const timer = setTimeout(() => { svc.pending.delete(id); resolve({ error: "timeout" }); }, CALL_TIMEOUT_MS); + svc.pending.set(id, { resolve, timer }); + try { + svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", id, method, ...(params ? { params } : {}) }) + "\n"); + } catch (e) { + clearTimeout(timer); svc.pending.delete(id); + resolve({ error: e.message }); + } + }); +} + +async function initMcp(svc) { + const init = await sendMcpRequest(svc, "initialize", { + protocolVersion: "2024-11-05", capabilities: {}, + clientInfo: { name: "claudemesh-runner", version: "0.1.0" }, + }); + if (init.error) throw new Error(`init failed: ${init.error}`); + if (svc.process?.stdin?.writable) { + svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", method: "notifications/initialized" }) + "\n"); + } + const tools = await sendMcpRequest(svc, "tools/list", {}); + if (tools.error) throw new Error(`tools/list failed: ${tools.error}`); + return tools.result?.tools ?? []; +} + +// --- Spawn --- + +function spawnService(svc) { + const { cmd, args } = detectEntry(svc.sourcePath, svc.runtime); + const child = spawn(cmd, args, { + cwd: svc.sourcePath, + stdio: ["pipe", "pipe", "pipe"], + env: { ...process.env, ...svc.env, NODE_ENV: "production" }, + }); + svc.process = child; + svc.pid = child.pid; + svc.status = "running"; + svc.healthFailures = 0; + + const rl = createInterface({ input: child.stdout }); + rl.on("line", line => { + try { + const msg = JSON.parse(line); + if (msg.id && svc.pending.has(String(msg.id))) { + const p = svc.pending.get(String(msg.id)); + clearTimeout(p.timer); svc.pending.delete(String(msg.id)); + p.resolve(msg.error ? { error: msg.error.message ?? JSON.stringify(msg.error) } : { result: msg.result }); + } + } catch { svc.logs.push(`[stdout] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); } + }); + + const errRl = createInterface({ input: child.stderr }); + errRl.on("line", line => { svc.logs.push(`[stderr] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); }); + + child.on("exit", (code, signal) => { + console.log(`[runner] ${svc.name} exited code=${code} signal=${signal} restarts=${svc.restarts}`); + for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "crashed" }); } + svc.pending.clear(); svc.process = null; svc.pid = null; + if (svc.status === "running" && svc.restarts < 5) { + svc.restarts++; + svc.status = "restarting"; + setTimeout(() => spawnService(svc), 1000 * svc.restarts); + } else if (svc.status === "running") { svc.status = "crashed"; } + }); + + child.on("error", err => { console.error(`[runner] ${svc.name} spawn error: ${err.message}`); svc.status = "failed"; }); + console.log(`[runner] spawned ${svc.name} pid=${child.pid} cmd=${cmd} ${args.join(" ")}`); +} + +// --- HTTP API --- + +function readBody(req) { + return new Promise((resolve, reject) => { + const chunks = []; + req.on("data", c => chunks.push(c)); + req.on("end", () => { try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } catch (e) { reject(e); } }); + req.on("error", reject); + }); +} + +function json(res, status, body) { + res.writeHead(status, { "Content-Type": "application/json" }); + res.end(JSON.stringify(body)); +} + +const server = createServer(async (req, res) => { + try { + if (req.method === "GET" && req.url === "/health") { + const svcs = []; + for (const [name, svc] of services) { + svcs.push({ name, status: svc.status, pid: svc.pid, tools: svc.tools.length, restarts: svc.restarts }); + } + return json(res, 200, { ok: true, services: svcs }); + } + + if (req.method === "GET" && req.url?.startsWith("/list")) { + const url = new URL(req.url, "http://localhost"); + const name = url.searchParams.get("name"); + if (name) { + const svc = services.get(name); + if (!svc) return json(res, 404, { error: `service "${name}" not found` }); + return json(res, 200, { tools: svc.tools }); + } + const all = {}; + for (const [n, s] of services) all[n] = s.tools; + return json(res, 200, all); + } + + if (req.method === "GET" && req.url?.startsWith("/logs")) { + const url = new URL(req.url, "http://localhost"); + const name = url.searchParams.get("name"); + const lines = parseInt(url.searchParams.get("lines") || "50", 10); + const svc = services.get(name); + if (!svc) return json(res, 404, { error: "not found" }); + return json(res, 200, { lines: svc.logs.slice(-lines) }); + } + + if (req.method === "POST" && req.url === "/load") { + const body = await readBody(req); + const { name, sourcePath, env: svcEnv, runtime: rt } = body; + if (!name || !sourcePath) return json(res, 400, { error: "name and sourcePath required" }); + + // Kill existing + const existing = services.get(name); + if (existing?.process) { existing.status = "stopped"; existing.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 1000)); } + + const runtime = rt || detectRuntime(sourcePath); + const svc = { name, sourcePath, runtime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "installing", pending: new Map(), logs: [], restarts: 0, healthFailures: 0 }; + services.set(name, svc); + + // Install deps + try { await installDeps(sourcePath, runtime); } catch (e) { + svc.status = "failed"; svc.logs.push(`install failed: ${e.message}`); + return json(res, 500, { error: e.message }); + } + + // Spawn + MCP handshake + spawnService(svc); + await new Promise(r => setTimeout(r, 500)); + try { + svc.tools = await initMcp(svc); + console.log(`[runner] ${name} ready, ${svc.tools.length} tools`); + return json(res, 200, { status: "running", tools: svc.tools }); + } catch (e) { + svc.status = "failed"; svc.logs.push(`MCP init failed: ${e.message}`); + return json(res, 500, { error: e.message, logs: svc.logs.slice(-10) }); + } + } + + if (req.method === "POST" && req.url === "/call") { + const body = await readBody(req); + const { name, tool, args } = body; + const svc = services.get(name); + if (!svc) return json(res, 404, { error: `service "${name}" not found` }); + if (svc.status !== "running") return json(res, 503, { error: `service is ${svc.status}` }); + const result = await sendMcpRequest(svc, "tools/call", { name: tool, arguments: args || {} }); + return json(res, 200, result); + } + + if (req.method === "POST" && req.url === "/unload") { + const body = await readBody(req); + const { name } = body; + const svc = services.get(name); + if (!svc) return json(res, 404, { error: "not found" }); + svc.status = "stopped"; + if (svc.process) { svc.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 2000)); if (svc.process) svc.process.kill("SIGKILL"); } + for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "unloaded" }); } + services.delete(name); + return json(res, 200, { ok: true }); + } + + json(res, 404, { error: "not found" }); + } catch (e) { + console.error("[runner] request error:", e); + json(res, 500, { error: e.message }); + } +}); + +server.listen(PORT, "0.0.0.0", () => { + console.log(`[runner] supervisor listening on :${PORT}`); +}); + +process.on("SIGTERM", () => { + console.log("[runner] shutting down..."); + for (const [, svc] of services) { svc.status = "stopped"; svc.process?.kill("SIGTERM"); } + server.close(() => process.exit(0)); +}); diff --git a/docker-compose.production.yml b/docker-compose.production.yml index 2bb9d89..9328475 100644 --- a/docker-compose.production.yml +++ b/docker-compose.production.yml @@ -83,6 +83,25 @@ services: start_period: 30s retries: 3 + runner: + build: + context: ./apps/runner + restart: always + environment: + RUNNER_PORT: 7901 + volumes: + - services-data:/var/claudemesh/services + expose: + - "7901" + networks: + - claudemesh-internal + healthcheck: + test: ["CMD", "node", "-e", "fetch('http://localhost:7901/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))"] + interval: 30s + timeout: 5s + start_period: 10s + retries: 3 + broker: image: ${BROKER_IMAGE:-claudemesh-broker:latest} restart: always @@ -103,6 +122,11 @@ services: NEO4J_URL: bolt://neo4j:7687 NEO4J_USER: neo4j NEO4J_PASSWORD: ${NEO4J_PASSWORD:-changeme} + RUNNER_URL: http://runner:7901 + CLAUDEMESH_SERVICES_DIR: /var/claudemesh/services + BROKER_ENCRYPTION_KEY: ${BROKER_ENCRYPTION_KEY:-} + volumes: + - services-data:/var/claudemesh/services expose: - "7900" networks: @@ -115,6 +139,8 @@ services: condition: service_healthy neo4j: condition: service_healthy + runner: + condition: service_healthy healthcheck: test: ["CMD", "bun", "-e", "fetch('http://localhost:7900/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))"] interval: 15s @@ -159,6 +185,7 @@ volumes: minio-data: qdrant-data: neo4j-data: + services-data: networks: # Coolify's shared Traefik network — must already exist on the host