feat: runner container + broker deploy pipeline
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

- apps/runner/: Dockerfile (node22 + python3 + uv + bun) + supervisor.mjs
  (HTTP API for load/call/unload/health)
- docker-compose: runner service with shared services-data volume
- Broker mcp_deploy: git clone or zip extract → runner /load → MCP spawn
- Broker mcp_call: routes managed services to runner via HTTP, falls back
  to live-proxy for peer-hosted servers
- RUNNER_URL env var for broker → runner communication

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-08 13:06:43 +01:00
parent 070a3b7422
commit 873f588057
5 changed files with 454 additions and 7 deletions

View File

@@ -28,6 +28,7 @@ const envSchema = z.object({
NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"),
NEO4J_PASSWORD: z.string().default("changeme"),
RUNNER_URL: z.string().default("http://runner:7901"),
CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"),
BROKER_ENCRYPTION_KEY: z.string().default(""), // 64 hex chars (32 bytes). Auto-generated if empty.
MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20),

View File

@@ -2806,6 +2806,34 @@ function handleConnection(ws: WebSocket): void {
case "mcp_call": {
const mc = msg as Extract<WSClientMessage, { type: "mcp_call" }>;
const callKey = `${conn.meshId}:${mc.serverName}`;
// Check managed services first (runner-hosted)
const managedSvc = await getService(conn.meshId, mc.serverName);
if (managedSvc && managedSvc.status === "running") {
try {
const runnerRes = await fetch(`${env.RUNNER_URL}/call`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name: mc.serverName, tool: mc.toolName, args: mc.args ?? {} }),
});
const result = await runnerRes.json() as { result?: unknown; error?: string };
sendToPeer(presenceId, {
type: "mcp_call_result",
...(result.result !== undefined ? { result: result.result } : {}),
...(result.error ? { error: result.error } : {}),
...(_reqId ? { _reqId } : {}),
} as any);
} catch (e) {
sendToPeer(presenceId, {
type: "mcp_call_result",
error: `runner call failed: ${e instanceof Error ? e.message : String(e)}`,
...(_reqId ? { _reqId } : {}),
} as any);
}
break;
}
// Fall back to live-proxy (peer-hosted) MCP registry
const server = mcpRegistry.get(callKey);
if (!server) {
sendToPeer(presenceId, {
@@ -3195,13 +3223,86 @@ function handleConnection(ws: WebSocket): void {
scope: md.scope ?? "peer", deployedBy: conn.memberId, deployedByName: conn.displayName,
});
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "building", _reqId: md._reqId } as any);
broadcastToMesh(conn.meshId, {
type: "push", subtype: "system" as const, event: "mcp_deployed",
eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: 0, deployed_by: conn.displayName, scope: md.scope ?? "peer" },
messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system",
priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(),
});
log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name });
log.info("ws mcp_deploy", { presence_id: presenceId, name: md.server_name, source: md.source.type });
// --- Source extraction + runner spawn (async, non-blocking) ---
(async () => {
try {
const { mkdirSync, writeFileSync } = await import("node:fs");
const { join } = await import("node:path");
const sourcePath = join(env.CLAUDEMESH_SERVICES_DIR, conn.meshId, md.server_name, "source");
mkdirSync(sourcePath, { recursive: true });
// Extract source
if (md.source.type === "git") {
const { execSync } = await import("node:child_process");
const gitUrl = md.source.url;
const branch = md.source.branch ?? "main";
execSync(`git clone --depth 1 --branch ${branch} ${gitUrl} .`, { cwd: sourcePath, timeout: 60_000 });
log.info("git clone complete", { name: md.server_name, url: gitUrl });
} else if (md.source.type === "zip" && md.source.file_id) {
// Download from MinIO and extract
const bucket = meshBucketName(conn.meshId);
const fileRow = await getFile(conn.meshId, md.source.file_id);
if (!fileRow) throw new Error(`file ${md.source.file_id} not found`);
const stream = await minioClient.getObject(bucket, (fileRow as any).minioKey);
const chunks: Buffer[] = [];
for await (const chunk of stream) chunks.push(chunk as Buffer);
const zipBuf = Buffer.concat(chunks);
// Write zip and extract
const zipPath = join(sourcePath, ".._upload.zip");
writeFileSync(zipPath, zipBuf);
const { execSync } = await import("node:child_process");
execSync(`unzip -o "${zipPath}" -d .`, { cwd: sourcePath, timeout: 30_000 });
execSync(`rm -f "${zipPath}"`, { cwd: sourcePath });
log.info("zip extracted", { name: md.server_name, file_id: md.source.file_id });
} else if (md.source.type === "npx") {
// npx-based: no source extraction needed, runner spawns via npx
// Write a marker file so runner knows the spawn command
writeFileSync(join(sourcePath, ".npx-package"), md.source.package ?? md.server_name);
}
// Resolve env vars (decrypted by CLI, sent as plaintext over TLS)
const resolvedEnv = md.config?.env ?? {};
// Call runner HTTP API to load the service
const runnerRes = await fetch(`${env.RUNNER_URL}/load`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
name: md.server_name,
sourcePath,
env: resolvedEnv,
runtime: md.config?.runtime,
}),
});
const runnerResult = await runnerRes.json() as { status?: string; tools?: any[]; error?: string };
if (!runnerRes.ok || runnerResult.error) {
await updateServiceStatus(conn.meshId, md.server_name, "failed");
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "failed", error: runnerResult.error, _reqId: md._reqId } as any);
log.error("runner load failed", { name: md.server_name, error: runnerResult.error });
return;
}
// Update DB with tools and running status
await updateServiceStatus(conn.meshId, md.server_name, "running", {
toolsSchema: runnerResult.tools,
});
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "running", tools: runnerResult.tools, _reqId: md._reqId } as any);
broadcastToMesh(conn.meshId, {
type: "push", subtype: "system" as const, event: "mcp_deployed",
eventData: { name: md.server_name, description: `MCP server: ${md.server_name}`, tool_count: runnerResult.tools?.length ?? 0, deployed_by: conn.displayName, scope: md.scope ?? "peer", tools: runnerResult.tools },
messageId: crypto.randomUUID(), meshId: conn.meshId, senderPubkey: "system",
priority: "low", nonce: "", ciphertext: "", createdAt: new Date().toISOString(),
});
log.info("service deployed", { name: md.server_name, tools: runnerResult.tools?.length ?? 0 });
} catch (e) {
await updateServiceStatus(conn.meshId, md.server_name, "failed").catch(() => {});
sendToPeer(presenceId, { type: "mcp_deploy_status", server_name: md.server_name, status: "failed", error: e instanceof Error ? e.message : String(e), _reqId: md._reqId } as any);
log.error("deploy pipeline failed", { name: md.server_name, error: e instanceof Error ? e.message : String(e) });
}
})();
} catch (e) { sendError(ws, "deploy_error", e instanceof Error ? e.message : String(e), undefined, md._reqId); }
break;
}

40
apps/runner/Dockerfile Normal file
View File

@@ -0,0 +1,40 @@
# claudemesh runner — executes deployed MCP servers as child processes.
# Multi-runtime: Node 22 + Python 3.12 + uv + Bun
#
# The runner supervisor (Node) listens on HTTP :7901 for commands from
# the broker (load, call, unload, health, list_tools). Each deployed
# MCP server runs as a child process with its own stdio pipe.
FROM node:22-slim AS base
# Install Python 3.12 + uv (fast pip replacement) + git
RUN apt-get update && apt-get install -y --no-install-recommends \
python3 python3-pip python3-venv \
curl ca-certificates git \
&& curl -LsSf https://astral.sh/uv/install.sh | sh \
&& ln -sf /root/.local/bin/uv /usr/local/bin/uv \
&& ln -sf /root/.local/bin/uvx /usr/local/bin/uvx \
&& rm -rf /var/lib/apt/lists/*
# Install Bun (for bun-based MCP servers)
RUN curl -fsSL https://bun.sh/install | bash \
&& ln -sf /root/.bun/bin/bun /usr/local/bin/bun \
&& ln -sf /root/.bun/bin/bunx /usr/local/bin/bunx
WORKDIR /app
# Copy the runner supervisor
COPY supervisor.mjs /app/supervisor.mjs
# Services directory (shared volume with broker)
RUN mkdir -p /var/claudemesh/services
ENV NODE_ENV=production
ENV RUNNER_PORT=7901
EXPOSE 7901
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
CMD node -e "fetch('http://localhost:7901/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))"
CMD ["node", "supervisor.mjs"]

278
apps/runner/supervisor.mjs Normal file
View File

@@ -0,0 +1,278 @@
/**
* claudemesh runner supervisor — manages MCP server child processes.
*
* HTTP API (called by broker):
* POST /load { name, sourcePath, env, runtime } → spawn MCP, return tools
* POST /call { name, tool, args } → route tool call
* POST /unload { name } → kill process
* GET /health → { ok, services }
* GET /list { name? } → tools for a service
*
* Each MCP server is a child process with its own stdio pipe.
* The supervisor talks MCP JSON-RPC over stdin/stdout to each child.
*/
import { createServer } from "node:http";
import { spawn } from "node:child_process";
import { createInterface } from "node:readline";
import { existsSync, readFileSync, mkdirSync } from "node:fs";
import { join } from "node:path";
const PORT = parseInt(process.env.RUNNER_PORT || "7901", 10);
const CALL_TIMEOUT_MS = 25_000;
const LOG_BUFFER_SIZE = 500;
// --- Service registry ---
const services = new Map();
let callIdCounter = 0;
// --- Runtime detection ---
function detectRuntime(sourcePath) {
if (existsSync(join(sourcePath, "bun.lockb")) || existsSync(join(sourcePath, "bunfig.toml"))) return "bun";
if (existsSync(join(sourcePath, "package.json"))) return "node";
if (existsSync(join(sourcePath, "pyproject.toml")) || existsSync(join(sourcePath, "requirements.txt"))) return "python";
return "node";
}
function detectEntry(sourcePath, runtime) {
if (runtime === "python") {
for (const e of ["server.py", "src/server.py", "main.py", "src/main.py"]) {
if (existsSync(join(sourcePath, e))) return { cmd: "python3", args: [e] };
}
if (existsSync(join(sourcePath, "pyproject.toml"))) return { cmd: "python3", args: ["-m", "server"] };
return { cmd: "python3", args: ["server.py"] };
}
const cmd = runtime === "bun" ? "bun" : "node";
if (existsSync(join(sourcePath, "package.json"))) {
try {
const pkg = JSON.parse(readFileSync(join(sourcePath, "package.json"), "utf-8"));
if (pkg.main) return { cmd, args: [pkg.main] };
if (pkg.bin) {
const bin = typeof pkg.bin === "string" ? pkg.bin : Object.values(pkg.bin)[0];
if (bin) return { cmd, args: [bin] };
}
} catch {}
}
for (const e of ["dist/index.js", "src/index.js", "src/index.ts", "index.js"]) {
if (existsSync(join(sourcePath, e))) return { cmd, args: [e] };
}
return { cmd, args: ["src/index.js"] };
}
// --- Install deps ---
function installDeps(sourcePath, runtime) {
return new Promise((resolve, reject) => {
let cmd, args;
if (runtime === "python") {
if (existsSync(join(sourcePath, "requirements.txt"))) {
cmd = "pip3"; args = ["install", "--no-cache-dir", "-r", "requirements.txt"];
} else { cmd = "pip3"; args = ["install", "--no-cache-dir", "."]; }
} else if (runtime === "bun") {
cmd = "bun"; args = ["install"];
} else {
cmd = "npm"; args = ["install", "--production", "--legacy-peer-deps"];
}
const child = spawn(cmd, args, { cwd: sourcePath, stdio: ["ignore", "pipe", "pipe"] });
let stderr = "";
child.stderr?.on("data", d => { stderr += d.toString(); });
child.on("exit", code => code === 0 ? resolve() : reject(new Error(`${cmd} install exit ${code}: ${stderr.slice(-300)}`)));
child.on("error", reject);
});
}
// --- MCP JSON-RPC ---
function sendMcpRequest(svc, method, params) {
return new Promise(resolve => {
if (!svc.process?.stdin?.writable) { resolve({ error: "not running" }); return; }
const id = `c_${++callIdCounter}`;
const timer = setTimeout(() => { svc.pending.delete(id); resolve({ error: "timeout" }); }, CALL_TIMEOUT_MS);
svc.pending.set(id, { resolve, timer });
try {
svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", id, method, ...(params ? { params } : {}) }) + "\n");
} catch (e) {
clearTimeout(timer); svc.pending.delete(id);
resolve({ error: e.message });
}
});
}
async function initMcp(svc) {
const init = await sendMcpRequest(svc, "initialize", {
protocolVersion: "2024-11-05", capabilities: {},
clientInfo: { name: "claudemesh-runner", version: "0.1.0" },
});
if (init.error) throw new Error(`init failed: ${init.error}`);
if (svc.process?.stdin?.writable) {
svc.process.stdin.write(JSON.stringify({ jsonrpc: "2.0", method: "notifications/initialized" }) + "\n");
}
const tools = await sendMcpRequest(svc, "tools/list", {});
if (tools.error) throw new Error(`tools/list failed: ${tools.error}`);
return tools.result?.tools ?? [];
}
// --- Spawn ---
function spawnService(svc) {
const { cmd, args } = detectEntry(svc.sourcePath, svc.runtime);
const child = spawn(cmd, args, {
cwd: svc.sourcePath,
stdio: ["pipe", "pipe", "pipe"],
env: { ...process.env, ...svc.env, NODE_ENV: "production" },
});
svc.process = child;
svc.pid = child.pid;
svc.status = "running";
svc.healthFailures = 0;
const rl = createInterface({ input: child.stdout });
rl.on("line", line => {
try {
const msg = JSON.parse(line);
if (msg.id && svc.pending.has(String(msg.id))) {
const p = svc.pending.get(String(msg.id));
clearTimeout(p.timer); svc.pending.delete(String(msg.id));
p.resolve(msg.error ? { error: msg.error.message ?? JSON.stringify(msg.error) } : { result: msg.result });
}
} catch { svc.logs.push(`[stdout] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); }
});
const errRl = createInterface({ input: child.stderr });
errRl.on("line", line => { svc.logs.push(`[stderr] ${line}`); if (svc.logs.length > LOG_BUFFER_SIZE) svc.logs.shift(); });
child.on("exit", (code, signal) => {
console.log(`[runner] ${svc.name} exited code=${code} signal=${signal} restarts=${svc.restarts}`);
for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "crashed" }); }
svc.pending.clear(); svc.process = null; svc.pid = null;
if (svc.status === "running" && svc.restarts < 5) {
svc.restarts++;
svc.status = "restarting";
setTimeout(() => spawnService(svc), 1000 * svc.restarts);
} else if (svc.status === "running") { svc.status = "crashed"; }
});
child.on("error", err => { console.error(`[runner] ${svc.name} spawn error: ${err.message}`); svc.status = "failed"; });
console.log(`[runner] spawned ${svc.name} pid=${child.pid} cmd=${cmd} ${args.join(" ")}`);
}
// --- HTTP API ---
function readBody(req) {
return new Promise((resolve, reject) => {
const chunks = [];
req.on("data", c => chunks.push(c));
req.on("end", () => { try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } catch (e) { reject(e); } });
req.on("error", reject);
});
}
function json(res, status, body) {
res.writeHead(status, { "Content-Type": "application/json" });
res.end(JSON.stringify(body));
}
const server = createServer(async (req, res) => {
try {
if (req.method === "GET" && req.url === "/health") {
const svcs = [];
for (const [name, svc] of services) {
svcs.push({ name, status: svc.status, pid: svc.pid, tools: svc.tools.length, restarts: svc.restarts });
}
return json(res, 200, { ok: true, services: svcs });
}
if (req.method === "GET" && req.url?.startsWith("/list")) {
const url = new URL(req.url, "http://localhost");
const name = url.searchParams.get("name");
if (name) {
const svc = services.get(name);
if (!svc) return json(res, 404, { error: `service "${name}" not found` });
return json(res, 200, { tools: svc.tools });
}
const all = {};
for (const [n, s] of services) all[n] = s.tools;
return json(res, 200, all);
}
if (req.method === "GET" && req.url?.startsWith("/logs")) {
const url = new URL(req.url, "http://localhost");
const name = url.searchParams.get("name");
const lines = parseInt(url.searchParams.get("lines") || "50", 10);
const svc = services.get(name);
if (!svc) return json(res, 404, { error: "not found" });
return json(res, 200, { lines: svc.logs.slice(-lines) });
}
if (req.method === "POST" && req.url === "/load") {
const body = await readBody(req);
const { name, sourcePath, env: svcEnv, runtime: rt } = body;
if (!name || !sourcePath) return json(res, 400, { error: "name and sourcePath required" });
// Kill existing
const existing = services.get(name);
if (existing?.process) { existing.status = "stopped"; existing.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 1000)); }
const runtime = rt || detectRuntime(sourcePath);
const svc = { name, sourcePath, runtime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "installing", pending: new Map(), logs: [], restarts: 0, healthFailures: 0 };
services.set(name, svc);
// Install deps
try { await installDeps(sourcePath, runtime); } catch (e) {
svc.status = "failed"; svc.logs.push(`install failed: ${e.message}`);
return json(res, 500, { error: e.message });
}
// Spawn + MCP handshake
spawnService(svc);
await new Promise(r => setTimeout(r, 500));
try {
svc.tools = await initMcp(svc);
console.log(`[runner] ${name} ready, ${svc.tools.length} tools`);
return json(res, 200, { status: "running", tools: svc.tools });
} catch (e) {
svc.status = "failed"; svc.logs.push(`MCP init failed: ${e.message}`);
return json(res, 500, { error: e.message, logs: svc.logs.slice(-10) });
}
}
if (req.method === "POST" && req.url === "/call") {
const body = await readBody(req);
const { name, tool, args } = body;
const svc = services.get(name);
if (!svc) return json(res, 404, { error: `service "${name}" not found` });
if (svc.status !== "running") return json(res, 503, { error: `service is ${svc.status}` });
const result = await sendMcpRequest(svc, "tools/call", { name: tool, arguments: args || {} });
return json(res, 200, result);
}
if (req.method === "POST" && req.url === "/unload") {
const body = await readBody(req);
const { name } = body;
const svc = services.get(name);
if (!svc) return json(res, 404, { error: "not found" });
svc.status = "stopped";
if (svc.process) { svc.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 2000)); if (svc.process) svc.process.kill("SIGKILL"); }
for (const [, p] of svc.pending) { clearTimeout(p.timer); p.resolve({ error: "unloaded" }); }
services.delete(name);
return json(res, 200, { ok: true });
}
json(res, 404, { error: "not found" });
} catch (e) {
console.error("[runner] request error:", e);
json(res, 500, { error: e.message });
}
});
server.listen(PORT, "0.0.0.0", () => {
console.log(`[runner] supervisor listening on :${PORT}`);
});
process.on("SIGTERM", () => {
console.log("[runner] shutting down...");
for (const [, svc] of services) { svc.status = "stopped"; svc.process?.kill("SIGTERM"); }
server.close(() => process.exit(0));
});

View File

@@ -83,6 +83,25 @@ services:
start_period: 30s
retries: 3
runner:
build:
context: ./apps/runner
restart: always
environment:
RUNNER_PORT: 7901
volumes:
- services-data:/var/claudemesh/services
expose:
- "7901"
networks:
- claudemesh-internal
healthcheck:
test: ["CMD", "node", "-e", "fetch('http://localhost:7901/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))"]
interval: 30s
timeout: 5s
start_period: 10s
retries: 3
broker:
image: ${BROKER_IMAGE:-claudemesh-broker:latest}
restart: always
@@ -103,6 +122,11 @@ services:
NEO4J_URL: bolt://neo4j:7687
NEO4J_USER: neo4j
NEO4J_PASSWORD: ${NEO4J_PASSWORD:-changeme}
RUNNER_URL: http://runner:7901
CLAUDEMESH_SERVICES_DIR: /var/claudemesh/services
BROKER_ENCRYPTION_KEY: ${BROKER_ENCRYPTION_KEY:-}
volumes:
- services-data:/var/claudemesh/services
expose:
- "7900"
networks:
@@ -115,6 +139,8 @@ services:
condition: service_healthy
neo4j:
condition: service_healthy
runner:
condition: service_healthy
healthcheck:
test: ["CMD", "bun", "-e", "fetch('http://localhost:7900/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))"]
interval: 15s
@@ -159,6 +185,7 @@ volumes:
minio-data:
qdrant-data:
neo4j-data:
services-data:
networks:
# Coolify's shared Traefik network — must already exist on the host