diff --git a/apps/cli/CHANGELOG.md b/apps/cli/CHANGELOG.md index 2903ad7..3f17787 100644 --- a/apps/cli/CHANGELOG.md +++ b/apps/cli/CHANGELOG.md @@ -1,5 +1,65 @@ # Changelog +## 1.24.0 (2026-05-03) — daemon required + thin MCP shim + +The architectural convergence v0.9.0 was building toward. + +### Daemon promoted from optional to required (for in-Claude-Code use) + +The CLI itself (`claudemesh send`, `peer list`, `inbox`, `vault`, `watch`, +`webhook`, etc.) keeps working without a daemon. But the MCP server — +which provides Claude Code's mid-turn channel push, slash commands, and +resource browser — now requires the daemon. There is no fallback. + +- `claudemesh install` auto-installs and starts the daemon service + (launchd / systemd-user) for the user's primary mesh. Pass + `--no-service` to opt out. +- `claudemesh launch` ensures the daemon is running before spawning + Claude Code; spawns it foreground if absent. +- The MCP shim probes `~/.claudemesh/daemon/daemon.sock` at boot. If + missing after a 2s grace window, it bails with actionable instructions + ("run `claudemesh daemon up --mesh `"). + +### MCP server: 979 → ~300 LoC of push-pipe code + +`apps/cli/src/mcp/server.ts` is now a thin daemon-SSE translator. It +no longer holds a broker WebSocket, decrypts messages, manages mesh +state, or runs reconnection logic. All of that is the daemon's job. + +- Subscribes to daemon `/v1/events` SSE; translates each `message` + event into a `notifications/claude/channel` emit. +- Sources mesh-published skills via daemon `/v1/skills` IPC for + ListPrompts / GetPrompt / ListResources / ReadResource. +- ListTools returns `[]` (the CLI is the API, taught via the bundled + skill). +- The mesh-service proxy mode (`claudemesh-cli --service `, + the sub-MCP-server for proxying a deployed mesh-MCP service) is + unchanged — separate code path, different lifecycle. + +Bundle size: MCP entry dropped from 154KB → 104KB (gzipped 34KB → 19KB). + +### Daemon SSE event payload extended + +`message` events on `/v1/events` now include plaintext-decrypted body, +sender member pubkey, priority, and subtype — everything the MCP shim +needs to render a complete channel notification without going back to +the broker. + +### Daemon IPC: GET /v1/skills (list) and GET /v1/skills/:name (get) + +The daemon exposes mesh-published skills over IPC so the MCP shim can +surface them as MCP prompts/resources without holding its own broker +WS. Same wire format as before from Claude Code's perspective. + +### Why this is the right architecture + +MCP and the daemon are no longer independent broker clients with +duplicated WS, decrypt, and dedupe logic. The daemon owns the broker +relationship; MCP is a Claude-Code-specific UX adapter that reads from +the daemon. Industry-normal shape (Tailscale, Slack, Ollama, Docker) +where the long-lived runtime is required and the per-app integrations +attach to it. + ## 1.23.0 (2026-05-03) — close the CLI surface, prune dead MCP stubs Three previously-MCP-only write verbs land on the CLI, closing every diff --git a/apps/cli/package.json b/apps/cli/package.json index c7ab414..3885559 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "1.23.0", + "version": "1.24.0", "description": "Peer mesh for Claude Code sessions — CLI + MCP server.", "keywords": [ "claude-code", diff --git a/apps/cli/src/commands/install.ts b/apps/cli/src/commands/install.ts index fd239b1..f342bbf 100644 --- a/apps/cli/src/commands/install.ts +++ b/apps/cli/src/commands/install.ts @@ -437,6 +437,7 @@ function installStatusLine(): { installed: boolean } { export function runInstall(args: string[] = []): void { const skipHooks = args.includes("--no-hooks"); const skipSkill = args.includes("--no-skill"); + const skipService = args.includes("--no-service"); const wantStatusLine = args.includes("--status-line"); render.section("claudemesh install"); @@ -544,11 +545,33 @@ export function runInstall(args: string[] = []): void { } let hasMeshes = false; + let primaryMesh: string | undefined; try { const meshConfig = readConfig(); hasMeshes = meshConfig.meshes.length > 0; + primaryMesh = meshConfig.meshes[0]?.slug; } catch {} + // Daemon service install — required for MCP integration as of 1.24.0. + // The daemon owns the broker WS and feeds the MCP push-pipe via SSE; + // skipping it leaves channel push, slash commands, and resources broken. + if (!skipService && hasMeshes && primaryMesh) { + try { + installDaemonService(entry, primaryMesh); + } catch (e) { + render.warn( + `daemon service install failed: ${e instanceof Error ? e.message : String(e)}`, + "Run `claudemesh daemon install-service --mesh ` to retry.", + ); + } + } else if (skipService) { + render.info(dim("· Daemon service skipped (--no-service)")); + render.info(dim(" MCP integration will fail at boot until you start the daemon manually:")); + render.info(dim(" claudemesh daemon up --mesh ")); + } else if (!hasMeshes) { + render.info(dim("· Daemon service deferred — join a mesh first, then run install again.")); + } + render.blank(); render.warn(`${bold("RESTART CLAUDE CODE")} ${yellow("for MCP tools to appear.")}`); @@ -569,6 +592,67 @@ export function runInstall(args: string[] = []): void { render.info(dim(` claudemesh completions zsh # shell completions`)); } +/** + * Install + start the per-user daemon service for the primary mesh. + * + * Refuses on CI hosts (the service-install module guards this); falls + * back to a friendly message and lets the install otherwise succeed. + * The MCP push-pipe will fail loudly if the daemon isn't reachable, so + * the user knows there's a problem before it shows up as "no messages + * arriving." + */ +function installDaemonService(binaryEntry: string, meshSlug: string): void { + const { + installService, + detectPlatform, + } = require("~/daemon/service-install.js") as typeof import("../daemon/service-install.js"); + + const platform = detectPlatform(); + if (!platform) { + render.info(dim(`· Daemon service skipped — unsupported platform: ${process.platform}`)); + return; + } + + // Resolve the binary the service unit should launch. When invoked from a + // bundled binary, argv[1] is correct. When invoked under tsx / dev, fall + // back to whatever `claudemesh` resolves to on PATH so the unit launches + // a shipped binary, not a dev script. + let binary = process.argv[1] ?? binaryEntry; + if (!binary || /\.ts$/.test(binary) || /node_modules|src\/entrypoints/.test(binary)) { + try { + const { execSync } = require("node:child_process") as typeof import("node:child_process"); + binary = execSync("which claudemesh", { encoding: "utf8" }).trim(); + } catch { + render.warn( + "couldn't resolve a 'claudemesh' binary on PATH; daemon service skipped", + "Install via npm/homebrew, then run `claudemesh daemon install-service --mesh " + meshSlug + "`", + ); + return; + } + } + + const r = installService({ binaryPath: binary, meshSlug }); + render.ok(`daemon service installed (${r.platform})`); + render.kv([ + ["unit", dim(r.unitPath)], + ["mesh", dim(meshSlug)], + ]); + + // Boot the unit immediately so MCP has a daemon to attach to on next + // Claude Code launch. Best-effort: if launchctl/systemctl errors out we + // log and continue — the user can run the boot command manually. + try { + const { execSync } = require("node:child_process") as typeof import("node:child_process"); + execSync(r.bootCommand, { stdio: "ignore" }); + render.ok("daemon started"); + } catch (e) { + render.warn( + `daemon service installed but failed to start: ${e instanceof Error ? e.message : String(e)}`, + `Run manually: ${r.bootCommand}`, + ); + } +} + export function runUninstall(): void { render.section("claudemesh uninstall"); diff --git a/apps/cli/src/commands/launch.ts b/apps/cli/src/commands/launch.ts index 216f8f6..97c19fa 100644 --- a/apps/cli/src/commands/launch.ts +++ b/apps/cli/src/commands/launch.ts @@ -44,6 +44,55 @@ export interface LaunchFlags { // --- Interactive mesh picker --- +/** + * Ensure the per-user daemon is running before we hand off to Claude Code. + * + * As of 1.24.0 the daemon owns the broker WS and feeds the MCP push-pipe + * over IPC SSE. If the socket is absent when Claude boots its MCP shim, + * the shim bails (no fallback). So we probe for the socket here and, if + * missing, spawn `claudemesh daemon up --mesh ` in the background, + * waiting briefly for the socket to appear. + * + * Best-effort: if the daemon spawn fails, we surface the error and let + * the launch proceed — Claude Code will print the same "daemon not + * running" message and the user can fix it manually. + */ +async function ensureDaemonRunning(meshSlug: string, quiet: boolean): Promise { + const { DAEMON_PATHS } = await import("~/daemon/paths.js"); + if (existsSync(DAEMON_PATHS.SOCK_FILE)) return; + + if (!quiet) render.info("starting claudemesh daemon…"); + const { spawn } = await import("node:child_process"); + const argv0 = process.argv[1] ?? "claudemesh"; + let binary = argv0; + if (/\.ts$/.test(binary) || /node_modules|src\/entrypoints/.test(binary)) { + try { + const { execSync } = await import("node:child_process"); + binary = execSync("which claudemesh", { encoding: "utf8" }).trim(); + } catch { binary = "claudemesh"; } + } + const child = spawn(binary, ["daemon", "up", "--mesh", meshSlug], { + detached: true, + stdio: "ignore", + }); + child.unref(); + + // Wait for the socket to appear. 10 s budget — covers cold node start + + // broker hello round-trip on slow links. + const start = Date.now(); + while (Date.now() - start < 10_000) { + if (existsSync(DAEMON_PATHS.SOCK_FILE)) { + if (!quiet) render.ok("daemon ready"); + return; + } + await new Promise((r) => setTimeout(r, 200)); + } + render.warn( + "daemon failed to start within 10s", + "Run `claudemesh daemon up --mesh " + meshSlug + "` manually, then re-launch.", + ); +} + async function pickMesh(meshes: JoinedMesh[]): Promise { if (meshes.length === 1) return meshes[0]!; @@ -550,6 +599,12 @@ export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise< } } catch { /* best effort */ } + // Ensure the daemon is running before we spawn Claude. The MCP shim + // (loaded by --dangerously-load-development-channels server:claudemesh) + // requires the daemon's UDS to be reachable at boot — if it isn't, + // channel push, slash commands, and resources fail. + await ensureDaemonRunning(mesh.slug, args.quiet); + // Clean up stale mesh MCP entries from crashed sessions try { const claudeConfigPath = join(homedir(), ".claude.json"); diff --git a/apps/cli/src/daemon/broker.ts b/apps/cli/src/daemon/broker.ts index 8a0c194..51bb11e 100644 --- a/apps/cli/src/daemon/broker.ts +++ b/apps/cli/src/daemon/broker.ts @@ -56,6 +56,19 @@ interface PendingPeerList { timer: NodeJS.Timeout; } +export interface SkillSummary { + name: string; + description: string; + tags: string[]; + author: string; + createdAt: string; +} + +export interface SkillFull extends SkillSummary { + instructions: string; + manifest?: unknown; +} + const HELLO_ACK_TIMEOUT_MS = 5_000; const SEND_ACK_TIMEOUT_MS = 15_000; const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; @@ -76,6 +89,8 @@ export class DaemonBrokerClient { private helloTimer: NodeJS.Timeout | null = null; private pendingAcks = new Map(); private peerListResolvers = new Map(); + private skillListResolvers = new Map void; timer: NodeJS.Timeout }>(); + private skillDataResolvers = new Map void; timer: NodeJS.Timeout }>(); private sessionPubkey: string | null = null; private sessionSecretKey: string | null = null; private opens: Array<() => void> = []; @@ -189,6 +204,28 @@ export class DaemonBrokerClient { return; } + if (msg.type === "skill_list") { + const reqId = String(msg._reqId ?? ""); + const pending = this.skillListResolvers.get(reqId); + if (pending) { + this.skillListResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve(Array.isArray(msg.skills) ? (msg.skills as SkillSummary[]) : []); + } + return; + } + + if (msg.type === "skill_data") { + const reqId = String(msg._reqId ?? ""); + const pending = this.skillDataResolvers.get(reqId); + if (pending) { + this.skillDataResolvers.delete(reqId); + clearTimeout(pending.timer); + pending.resolve((msg.skill as SkillFull) ?? null); + } + return; + } + if (msg.type === "push" || msg.type === "inbound") { this.opts.onPush?.(msg); return; @@ -264,6 +301,34 @@ export class DaemonBrokerClient { }); } + /** List mesh-published skills. Empty array on disconnect / timeout. */ + async listSkills(query?: string, timeoutMs = 5_000): Promise { + if (this._status !== "open" || !this.ws) return []; + return new Promise((resolve) => { + const reqId = `sl-${++this.reqCounter}`; + const timer = setTimeout(() => { + if (this.skillListResolvers.delete(reqId)) resolve([]); + }, timeoutMs); + this.skillListResolvers.set(reqId, { resolve, timer }); + try { this.ws!.send(JSON.stringify({ type: "list_skills", query, _reqId: reqId })); } + catch { this.skillListResolvers.delete(reqId); clearTimeout(timer); resolve([]); } + }); + } + + /** Fetch one skill's full body. Null on not-found / disconnect / timeout. */ + async getSkill(name: string, timeoutMs = 5_000): Promise { + if (this._status !== "open" || !this.ws) return null; + return new Promise((resolve) => { + const reqId = `sg-${++this.reqCounter}`; + const timer = setTimeout(() => { + if (this.skillDataResolvers.delete(reqId)) resolve(null); + }, timeoutMs); + this.skillDataResolvers.set(reqId, { resolve, timer }); + try { this.ws!.send(JSON.stringify({ type: "get_skill", name, _reqId: reqId })); } + catch { this.skillDataResolvers.delete(reqId); clearTimeout(timer); resolve(null); } + }); + } + /** Set the daemon's profile (avatar/title/bio/capabilities). Fire-and-forget. */ setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): void { if (this._status !== "open" || !this.ws) return; diff --git a/apps/cli/src/daemon/inbound.ts b/apps/cli/src/daemon/inbound.ts index 01f5813..86fa54a 100644 --- a/apps/cli/src/daemon/inbound.ts +++ b/apps/cli/src/daemon/inbound.ts @@ -44,11 +44,14 @@ export async function handleBrokerPush(msg: Record, ctx: Inboun const brokerMessageId = stringOrNull(msg.messageId); const senderPubkey = stringOrNull(msg.senderPubkey) ?? ""; const senderName = stringOrNull(msg.senderName) ?? senderPubkey.slice(0, 8); + const senderMemberPk = stringOrNull(msg.senderMemberPubkey); const topic = stringOrNull(msg.topic); const replyToId = stringOrNull(msg.replyToId); const ciphertext = stringOrNull(msg.ciphertext) ?? ""; const nonce = stringOrNull(msg.nonce) ?? ""; const createdAt = stringOrNull(msg.createdAt); + const priority = stringOrNull(msg.priority) ?? "next"; + const subtype = stringOrNull(msg.subtype); // Forward-compat: Sprint 7 brokers will send client_message_id alongside. const clientMessageId = stringOrNull(msg.client_message_id) ?? brokerMessageId ?? randomUUID(); const body = await decryptOrFallback({ @@ -78,9 +81,12 @@ export async function handleBrokerPush(msg: Record, ctx: Inboun client_message_id: clientMessageId, broker_message_id: brokerMessageId, sender_pubkey: senderPubkey, + sender_member_pubkey: senderMemberPk, sender_name: senderName, topic, reply_to_id: replyToId, + priority, + ...(subtype ? { subtype } : {}), body, created_at: createdAt, }); diff --git a/apps/cli/src/daemon/ipc/server.ts b/apps/cli/src/daemon/ipc/server.ts index 32a5969..29792b8 100644 --- a/apps/cli/src/daemon/ipc/server.ts +++ b/apps/cli/src/daemon/ipc/server.ts @@ -173,7 +173,7 @@ function makeHandler(opts: { respond(res, 200, { daemon_version: VERSION, ipc_api: "v1", - ipc_features: ["version", "health", "send", "inbox", "events", "peers", "profile"], + ipc_features: ["version", "health", "send", "inbox", "events", "peers", "profile", "skills"], schema_version: 1, }); return; @@ -204,6 +204,32 @@ function makeHandler(opts: { return; } + if (req.method === "GET" && url.pathname === "/v1/skills") { + if (!opts.broker) { respond(res, 503, { error: "broker not initialised" }); return; } + const query = url.searchParams.get("query") ?? undefined; + try { + const skills = await opts.broker.listSkills(query); + respond(res, 200, { skills }); + } catch (e) { + respond(res, 502, { error: "broker_unreachable", detail: String(e) }); + } + return; + } + + if (req.method === "GET" && url.pathname.startsWith("/v1/skills/")) { + if (!opts.broker) { respond(res, 503, { error: "broker not initialised" }); return; } + const name = decodeURIComponent(url.pathname.slice("/v1/skills/".length)); + if (!name) { respond(res, 400, { error: "missing skill name" }); return; } + try { + const skill = await opts.broker.getSkill(name); + if (!skill) { respond(res, 404, { error: "skill_not_found", name }); return; } + respond(res, 200, { skill }); + } catch (e) { + respond(res, 502, { error: "broker_unreachable", detail: String(e) }); + } + return; + } + if (req.method === "POST" && url.pathname === "/v1/profile") { if (!opts.broker) { respond(res, 503, { error: "broker not initialised" }); return; } try { diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 08b9105..b66460c 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -1,8 +1,23 @@ /** * MCP server (stdio transport) for claudemesh-cli. * - * Starts BrokerClient connections for every mesh in config on boot, - * then routes the 5 MCP tools through them. + * As of 1.24.0 / daemon v1.0, the MCP server is a thin daemon-SSE + * translator. It does NOT hold a broker WebSocket, decrypt messages, or + * track mesh state — those are the daemon's job. MCP just: + * + * 1. probes ~/.claudemesh/daemon/daemon.sock at boot; + * 2. fails loudly if the daemon isn't running (no fallback); + * 3. subscribes to /v1/events SSE and translates each event into a + * Claude Code `notifications/claude/channel` notification; + * 4. surfaces mesh-published skills as MCP prompts and resources by + * querying /v1/skills over IPC. + * + * The mesh-service proxy mode (claudemesh-cli --service ) lives at + * the bottom of this file and is unrelated — it acts as a sub-MCP-server + * for one deployed mesh-MCP service. Untouched by this rewrite. + * + * Spec: .artifacts/specs/2026-05-03-daemon-spec-v0.9.0.md plus the + * 1.24.0 daemon-required addendum. */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; @@ -15,790 +30,319 @@ import { ListResourcesRequestSchema, ReadResourceRequestSchema, } from "@modelcontextprotocol/sdk/types.js"; -// CallToolRequestSchema is still imported for the mesh-service proxy mode -// further down; the main MCP server has no tools as of 1.5.0 (tool-less -// push-pipe — spec 2026-05-02 commitment #6). -import { TOOLS } from "./tools/definitions.js"; +import { existsSync } from "node:fs"; +import { request as httpRequest, type IncomingMessage } from "node:http"; + +import { DAEMON_PATHS } from "~/daemon/paths.js"; +import { VERSION } from "~/constants/urls.js"; import { readConfig } from "~/services/config/facade.js"; -import { BrokerClient, startClients, stopAll, findClient, allClients } from "~/services/broker/facade.js"; -import { startBridgeServer, type BridgeServer } from "~/services/bridge/server.js"; -import type { InboundPush } from "~/services/broker/facade.js"; -import type { - Priority, - PeerStatus, - SendMessageArgs, - SetStatusArgs, - SetSummaryArgs, - ListPeersArgs, -} from "./types.js"; +import { BrokerClient } from "~/services/broker/facade.js"; -/** Compute a human-readable relative time string from an ISO timestamp. */ -function relativeTime(isoStr: string): string { - const then = new Date(isoStr).getTime(); - if (isNaN(then)) return "unknown"; - const diffMs = Date.now() - then; - if (diffMs < 0) return "just now"; - const seconds = Math.floor(diffMs / 1000); - if (seconds < 60) return `${seconds}s ago`; - const minutes = Math.floor(seconds / 60); - if (minutes < 60) return `${minutes}m ago`; - const hours = Math.floor(minutes / 60); - if (hours < 24) return `${hours}h ago`; - const days = Math.floor(hours / 24); - return `${days} day${days !== 1 ? "s" : ""} ago`; +// ── daemon probe ─────────────────────────────────────────────────────── + +const DAEMON_BOOT_RETRIES = 4; +const DAEMON_BOOT_RETRY_MS = 500; + +async function daemonReady(): Promise { + for (let i = 0; i < DAEMON_BOOT_RETRIES; i++) { + if (existsSync(DAEMON_PATHS.SOCK_FILE)) return true; + await new Promise((r) => setTimeout(r, DAEMON_BOOT_RETRY_MS)); + } + return false; } -function text(msg: string, isError = false) { - return { - content: [{ type: "text" as const, text: msg }], - ...(isError ? { isError: true } : {}), - }; +function bailNoDaemon(): never { + process.stderr.write( + "[claudemesh] daemon is not running.\n" + + " Start it: claudemesh daemon up --mesh \n" + + " Or install as service: claudemesh daemon install-service --mesh \n" + + " Diagnose: claudemesh doctor\n" + + "\n" + + " As of 1.24.0 the daemon is required for in-Claude-Code use of\n" + + " claudemesh. The CLI itself (claudemesh send/peer/inbox/...) still\n" + + " works without a daemon.\n", + ); + process.exit(1); } -/** - * Given a `to` string, pick which mesh to send from. Strategies: - * - If `to` looks like a pubkey hex (64 chars), use as-is. - * - If `to` starts with `#`, treat as channel. - * - If `to` is `*`, treat as broadcast. - * - Otherwise resolve as a display name via list_peers. - * - * Explicit mesh prefix `:` narrows to one mesh. - */ -async function resolveClient(to: string): Promise<{ - client: BrokerClient | null; - targetSpec: string; - error?: string; -}> { - const clients = allClients(); - if (clients.length === 0) { - return { client: null, targetSpec: to, error: "no meshes joined" }; - } - // Explicit mesh prefix: "mesh-slug:targetspec" - let targetClients = clients; - let target = to; - const colonIdx = to.indexOf(":"); - if (colonIdx > 0 && colonIdx < to.length - 1) { - const slug = to.slice(0, colonIdx); - const rest = to.slice(colonIdx + 1); - const match = findClient(slug); - if (match) { - targetClients = [match]; - target = rest; - } - } - // Channel, @group, or broadcast — pass through directly. - if (target.startsWith("#") || target.startsWith("@") || target === "*") { - if (targetClients.length === 1) { - return { client: targetClients[0]!, targetSpec: target }; - } - return { - client: null, - targetSpec: target, - error: `multiple meshes joined; prefix target with ":" (joined: ${clients.map((c) => c.meshSlug).join(", ")})`, - }; - } +// ── daemon IPC client (UDS) ──────────────────────────────────────────── - // Hex pubkey or hex prefix — resolve by prefix match across joined meshes. - // Accepts anything from 8 hex chars up to the full 64-char key. A full key - // also has to match an online peer to be worth routing; we verify by prefix - // against each mesh's current peer list. - if (/^[0-9a-f]{8,64}$/.test(target)) { - const hits: Array<{ mesh: BrokerClient; pubkey: string; displayName: string }> = []; - for (const c of targetClients) { - const peers = await c.listPeers(); - for (const p of peers) { - if (p.pubkey.startsWith(target)) { - hits.push({ mesh: c, pubkey: p.pubkey, displayName: p.displayName }); - } - } - } - if (hits.length === 1) { - return { client: hits[0]!.mesh, targetSpec: hits[0]!.pubkey }; - } - if (hits.length > 1) { - const lines = hits - .map((h) => ` - ${h.displayName} @ ${h.mesh.meshSlug} · pubkey ${h.pubkey.slice(0, 20)}…`) - .join("\n"); - return { - client: null, - targetSpec: target, - error: `ambiguous pubkey prefix "${target}" matches ${hits.length} peers:\n${lines}\nUse a longer prefix.`, - }; - } - // Full 64-char with no live match: still allow send — broker will queue it - // for when that peer comes online. Honors the existing queue-for-offline - // behaviour without breaking prefix semantics. - if (target.length === 64) { - if (targetClients.length === 1) { - return { client: targetClients[0]!, targetSpec: target }; - } - return { - client: null, - targetSpec: target, - error: `multiple meshes joined; prefix target with ":" (joined: ${clients.map((c) => c.meshSlug).join(", ")})`, - }; - } - // Short prefix, no match, and not interpretable as a name — surface it. - return { - client: null, - targetSpec: target, - error: `no online peer's pubkey starts with "${target}".`, - }; - } +interface DaemonGetResult { status: number; body: any } - // Name-based resolution. Exclude the caller's OWN session pubkey so - // "send to " routes to the OTHER same-named sessions - // (e.g. the same user's laptop on a different repo) instead of bouncing - // on the broker's self-send check. - const nameLower = target.toLowerCase(); - const candidates: Array<{ mesh: string; peers: Array<{ displayName: string; pubkey: string; cwd?: string }> }> = []; - const exactMatches: Array<{ mesh: BrokerClient; pubkey: string; displayName: string; cwd?: string }> = []; - const partialMatches: Array<{ mesh: BrokerClient; pubkey: string; displayName: string; cwd?: string }> = []; - - for (const c of targetClients) { - const ownSession = c.getSessionPubkey(); - const peers = await c.listPeers(); - candidates.push({ mesh: c.meshSlug, peers }); - for (const p of peers) { - if (ownSession && p.pubkey === ownSession) continue; // skip caller's own session - const nameLow = p.displayName.toLowerCase(); - if (nameLow === nameLower) { - exactMatches.push({ mesh: c, pubkey: p.pubkey, displayName: p.displayName, cwd: p.cwd }); - } else if (nameLow.includes(nameLower)) { - partialMatches.push({ mesh: c, pubkey: p.pubkey, displayName: p.displayName, cwd: p.cwd }); - } - } - } - - if (exactMatches.length === 1) { - return { client: exactMatches[0]!.mesh, targetSpec: exactMatches[0]!.pubkey }; - } - if (exactMatches.length > 1) { - const lines = exactMatches - .map((m) => ` - ${m.displayName} · pubkey ${m.pubkey.slice(0, 16)}…${m.cwd ? ` · cwd ${m.cwd}` : ""}`) - .join("\n"); - return { - client: null, - targetSpec: target, - error: - `"${target}" is ambiguous — ${exactMatches.length} peers share that display name:\n${lines}\n` + - `Disambiguate by pubkey prefix (e.g. send to "${exactMatches[0]!.pubkey.slice(0, 12)}…").`, - }; - } - - if (partialMatches.length === 1) { - process.stderr.write( - `[claudemesh] resolved "${target}" → "${partialMatches[0]!.displayName}" (partial match)\n`, +function daemonGet(path: string): Promise { + return new Promise((resolve, reject) => { + const req = httpRequest( + { socketPath: DAEMON_PATHS.SOCK_FILE, path, method: "GET", timeout: 5_000 }, + (res: IncomingMessage) => { + const chunks: Buffer[] = []; + res.on("data", (c) => chunks.push(c as Buffer)); + res.on("end", () => { + const text = Buffer.concat(chunks).toString("utf8"); + let body: any = null; + try { body = JSON.parse(text); } catch { body = text; } + resolve({ status: res.statusCode ?? 0, body }); + }); + }, ); - return { client: partialMatches[0]!.mesh, targetSpec: partialMatches[0]!.pubkey }; - } - if (partialMatches.length > 1) { - const lines = partialMatches - .map((m) => ` - ${m.displayName} · pubkey ${m.pubkey.slice(0, 16)}…`) - .join("\n"); - return { - client: null, - targetSpec: target, - error: `"${target}" partially matches ${partialMatches.length} peers:\n${lines}\nBe more specific, or use a pubkey prefix.`, - }; - } + req.on("error", reject); + req.on("timeout", () => req.destroy(new Error("daemon_ipc_timeout"))); + req.end(); + }); +} - // No match — refuse to send rather than silently queue a message for nobody. - const known = candidates.flatMap((c) => c.peers.map((p) => `${c.mesh}/${p.displayName}`)); +// ── daemon SSE subscription ──────────────────────────────────────────── + +interface DaemonEvent { kind: string; ts: string; data: Record } + +function subscribeEvents(onEvent: (e: DaemonEvent) => void): { close: () => void } { + let active = true; + let req: ReturnType | null = null; + + const connect = (): void => { + if (!active) return; + req = httpRequest({ + socketPath: DAEMON_PATHS.SOCK_FILE, + path: "/v1/events", + method: "GET", + headers: { Accept: "text/event-stream" }, + }); + let buffer = ""; + req.on("response", (res: IncomingMessage) => { + res.setEncoding("utf8"); + res.on("data", (chunk: string) => { + buffer += chunk; + let idx; + while ((idx = buffer.indexOf("\n\n")) >= 0) { + const block = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + if (!block.trim()) continue; + let kind = "message"; + let dataLine = ""; + for (const line of block.split("\n")) { + if (line.startsWith(":")) continue; + if (line.startsWith("event:")) kind = line.slice(6).trim(); + else if (line.startsWith("data:")) dataLine = line.slice(5).trim(); + } + if (!dataLine) continue; + try { + const parsed = JSON.parse(dataLine); + onEvent({ kind, ts: String(parsed.ts ?? ""), data: parsed }); + } catch { /* malformed event; skip */ } + } + }); + res.on("end", () => { + if (active) { + process.stderr.write("[claudemesh-mcp] sse stream ended; reconnecting in 1s\n"); + setTimeout(connect, 1_000); + } + }); + res.on("error", (err) => process.stderr.write(`[claudemesh-mcp] sse error: ${err.message}\n`)); + }); + req.on("error", (err) => { + process.stderr.write(`[claudemesh-mcp] sse connect error: ${err.message}\n`); + if (active) setTimeout(connect, 2_000); + }); + req.end(); + }; + + connect(); return { - client: null, - targetSpec: target, - error: - `peer "${target}" not found. ` + - (known.length - ? `Known peers: ${known.slice(0, 10).join(", ")}${known.length > 10 ? ", …" : ""}` - : "No connected peers on your mesh(es). Use pubkey hex, @group, or * for broadcast."), + close: () => { active = false; try { req?.destroy(); } catch { /* ignore */ } }, }; } -// Peer name cache to avoid calling listPeers on every incoming push -const peerNameCache = new Map(); -let peerNameCacheAge = 0; -const CACHE_TTL_MS = 30_000; - -async function resolvePeerName(client: BrokerClient, pubkey: string): Promise { - const now = Date.now(); - if (now - peerNameCacheAge > CACHE_TTL_MS) { - peerNameCache.clear(); - try { - const peers = await client.listPeers(); - for (const p of peers) peerNameCache.set(p.pubkey, p.displayName); - } catch { /* best effort */ } - peerNameCacheAge = now; - } - return peerNameCache.get(pubkey) ?? `peer-${pubkey.slice(0, 8)}`; -} - -function decryptFailedWarning(senderPubkey: string): string { - const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender"; - return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`; -} - -function formatPush(p: InboundPush, meshSlug: string): string { - const body = p.plaintext ?? decryptFailedWarning(p.senderPubkey); - const tag = p.subtype === "reminder" ? " [REMINDER]" : ""; - return `[${meshSlug}]${tag} from ${p.senderPubkey.slice(0, 12)}… (${p.priority}, ${p.createdAt}):\n${body}`; -} +// ── main MCP server (push-pipe + skills) ────────────────────────────── export async function startMcpServer(): Promise { - // Check for --service mode (native mesh MCP proxy) + // Mesh-service proxy mode: separate code path for proxying a deployed + // mesh MCP service into Claude Code. Unrelated to the daemon push-pipe. const serviceIdx = process.argv.indexOf("--service"); if (serviceIdx !== -1 && process.argv[serviceIdx + 1]) { return startServiceProxy(process.argv[serviceIdx + 1]!); } - // --mesh : bind this MCP server to a single mesh from the user's - // joined-meshes config. Used for the per-mesh push-pipe pattern in - // ~/.claude.json — one MCP entry per mesh, each holds one WS, push - // notifications fan out across all meshes simultaneously. - // Default behavior (no flag): connect to every mesh in config. - const meshIdx = process.argv.indexOf("--mesh"); - const onlyMesh = meshIdx !== -1 ? process.argv[meshIdx + 1] : null; - - const config = readConfig(); - - if (onlyMesh) { - const available = config.meshes.map((m) => m.slug); - const filtered = config.meshes.filter((m) => m.slug === onlyMesh); - if (filtered.length === 0) { - process.stderr.write( - `[claudemesh] --mesh "${onlyMesh}" not found in config. ` + - `Joined meshes: ${available.join(", ") || "(none)"}\n`, - ); - process.exit(1); - } - config.meshes = filtered; - } - - const myName = config.displayName ?? "unnamed"; - const myRole = config.role ?? process.env.CLAUDEMESH_ROLE ?? null; - const myGroups = (config.groups ?? []).map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ') || "none"; - const messageMode = config.messageMode ?? "push"; + const ok = await daemonReady(); + if (!ok) bailNoDaemon(); const server = new Server( - { name: "claudemesh", version: "0.3.0" }, - { - capabilities: { - experimental: { "claude/channel": {} }, - tools: {}, - prompts: {}, - resources: {}, - }, - instructions: `## Identity -You are "${myName}"${myRole ? ` (${myRole})` : ""} — a peer in the claudemesh network. Your groups: ${myGroups}. You are one of several Claude Code sessions connected to the same mesh. No orchestrator exists — peers are equals. Your identity comes from your name and group roles, not from a central authority. - -## Responding to messages -When you receive a message, RESPOND IMMEDIATELY. Pause your current task, reply via send_message (or \`claudemesh topic post --reply-to \` for topic threads), then resume. Stay in character per your system prompt. Do not ignore low-priority messages — acknowledge them briefly even if you defer action. - -The channel attributes carry everything you need to reply — no extra lookups: -- \`from_name\` — sender display name. Use as the \`to\` arg when replying to a DM. -- \`from_pubkey\` / \`from_member_id\` — stable ids. Use \`from_member_id\` if the sender's display name might change. -- \`mesh_slug\` — pass via \`--mesh\` if your default mesh differs. -- \`priority\` — \`now\` / \`next\` / \`low\`. -- \`message_id\` — id of THIS message. To thread a reply onto it in a topic, run \`claudemesh topic post "" --reply-to \`. -- \`topic\` — set when the message arrived through a topic (vs DM). Reply in the same topic. -- \`reply_to_id\` — set when the incoming message is itself a reply. Render thread context if you re-narrate. - -If the channel meta contains \`subtype: reminder\`, this is a scheduled reminder you set for yourself — act on it immediately (no reply needed). - -## Tools -| Tool | Description | -|------|-------------| -| send_message(to, message, priority?) | Send to peer name, @group, or * broadcast. \`to\` accepts display name, pubkey hex, @groupname, or *. | -| list_peers(mesh_slug?) | List connected peers with status, summary, groups, and roles. | -| check_messages() | Drain buffered inbound messages (auto-pushed in most cases, use as fallback). | -| set_summary(summary) | Set 1-2 sentence description of your current work, visible to all peers. | -| set_status(status) | Override status: idle, working, or dnd. | -| set_visible(visible) | Toggle visibility. Hidden peers skip list_peers and broadcasts; direct messages still arrive. | -| set_profile(avatar?, title?, bio?, capabilities?) | Set public profile: emoji avatar, short title, bio, capabilities list. | -| join_group(name, role?) | Join a @group with optional role (lead, member, observer, or any string). | -| leave_group(name) | Leave a @group. | -| set_state(key, value) | Write shared state; pushes change to all peers. | -| get_state(key) | Read a shared state value. | -| list_state() | List all state keys with values, authors, and timestamps. | -| remember(content, tags?) | Store persistent knowledge with optional tags. | -| recall(query) | Full-text search over mesh memory. | -| forget(id) | Soft-delete a memory entry. | -| claudemesh file share [--to peer] [--tags a,b] | Share a file with the mesh, or DM it to a specific peer. Same-host fast path: when --to matches a peer on this machine, sends an absolute filepath instead of uploading (no MinIO round-trip). | -| claudemesh file get [--out path] | Download a shared file by id. | -| claudemesh file list [query] | Find files shared in the mesh. | -| claudemesh file status | Check who has accessed a file. | -| claudemesh file delete | Remove a shared file from the mesh. | -| vector_store(collection, text, metadata?) | Store embedding in per-mesh Qdrant collection. | -| vector_search(collection, query, limit?) | Semantic search over stored embeddings. | -| vector_delete(collection, id) | Remove an embedding. | -| list_collections() | List vector collections in this mesh. | -| graph_query(cypher) | Read-only Cypher query on per-mesh Neo4j. | -| graph_execute(cypher) | Write Cypher query (CREATE, MERGE, DELETE). | -| mesh_query(sql) | Run a SELECT query on the per-mesh shared database. | -| mesh_execute(sql) | Run DDL/DML on the per-mesh database (CREATE TABLE, INSERT, UPDATE, DELETE). | -| mesh_schema() | List tables and columns in the per-mesh shared database. | -| create_stream(name) | Create a real-time data stream in the mesh. | -| publish(stream, data) | Push data to a stream. Subscribers receive it in real-time. | -| subscribe(stream) | Subscribe to a stream. Data pushes arrive as channel notifications. | -| list_streams() | List active streams in the mesh. | -| share_context(summary, files_read?, key_findings?, tags?) | Share session understanding with peers. | -| get_context(query) | Find context from peers who explored an area. | -| list_contexts() | See what all peers currently know. | -| create_task(title, assignee?, priority?, tags?) | Create a work item. | -| claim_task(id) | Claim an unclaimed task. | -| complete_task(id, result?) | Mark task done with optional result. | -| list_tasks(status?, assignee?) | List tasks filtered by status/assignee. | -| schedule_reminder(message, in_seconds?, deliver_at?, to?) | Schedule a reminder to yourself (no \`to\`) or a delayed message to a peer/group. Delivered as a push with \`subtype: reminder\` in the channel meta. | -| list_scheduled() | List pending scheduled reminders and messages. | -| cancel_scheduled(id) | Cancel a pending scheduled item. | -| read_peer_file(peer, path) | Read a file from another peer's project (max 1MB). | -| list_peer_files(peer, path?, pattern?) | List files in a peer's shared directory. | -| mesh_mcp_register(server_name, description, tools) | Register an MCP server with the mesh. Other peers can call its tools. | -| mesh_mcp_list() | List MCP servers available in the mesh with their tools. | -| mesh_tool_call(server_name, tool_name, args?) | Call a tool on a mesh-registered MCP server (30s timeout). | -| mesh_mcp_remove(server_name) | Unregister an MCP server you registered. | - -If multiple meshes are joined, prefix \`to\` with \`:\` to disambiguate (e.g. \`dev-team:Alice\`). - -Multi-target: send_message accepts an array of targets for the 'to' field. - send_message(to: ["Alice", "@backend"], message: "sprint starts") -Targets are deduplicated — each peer receives the message once. - -Targeted views: when different audiences need different details about the same event, -send tailored messages instead of one generic broadcast: - send_message(to: "@frontend", message: "Auth v2: useAuth hook changed, see src/auth/") - send_message(to: "@backend", message: "Auth v2: new /api/auth/v2 endpoints, v1 deprecated") - send_message(to: "@pm", message: "Auth v2 done. 3 points, no blockers.") - -## Groups -Groups are routing labels. Send to @groupname to multicast to all members. Roles are metadata that peers interpret: a "lead" gathers input before synthesizing a response, a "member" contributes when asked, an "observer" watches silently. Join and leave groups dynamically with join_group/leave_group. Check list_peers to see who belongs to which groups and their roles. - -## State -Shared key-value store scoped to the mesh. Use get_state/set_state for live coordination facts (deploy frozen? current sprint? PR queue). set_state pushes the change to all connected peers. Read state before asking peers questions — the answer may already be there. State is operational, not archival. - -## Memory -Persistent knowledge that survives across sessions. Use remember(content, tags?) to store lessons, decisions, and incidents. Use recall(query) to search before asking peers. New peers should recall at session start to load institutional knowledge. - -## File access — decision guide -Three ways to access files. Pick the right one: - -1. **Local peer (same machine, [local] tag):** Read files directly via filesystem using their \`cwd\` path from list_peers. No limit, instant. This is the default for local peers. -2. **Remote peer (different machine, [remote] tag):** Use \`read_peer_file(peer, path)\` — relays through the mesh. **1 MB limit**, base64 encoded. Use \`list_peer_files\` to browse first. -3. **Persistent sharing (any peer):** Use \`share_file(path)\` — uploads to mesh storage (MinIO). **No size limit**. All peers can download anytime via \`get_file\`. Use for files that need to persist or be shared with multiple peers. - -**Rule of thumb:** local peer → filesystem. Remote peer, small file → read_peer_file. Large file or needs to persist → share_file. - -## Vectors -Store and search semantic embeddings. Use vector_store to index content, vector_search to find similar content. - -## Graph -Build and query entity relationship graphs. Use graph_execute for writes (CREATE, MERGE), graph_query for reads (MATCH). - -## Mesh Database -Per-mesh PostgreSQL database. Use mesh_execute for DDL/DML (CREATE TABLE, INSERT), mesh_query for SELECT, mesh_schema to inspect tables. Schema auto-created on first use. - -## Streams -Real-time data channels. create_stream to start one, publish to push data, subscribe to receive pushes. Use for build logs, deploy status, live metrics. - -## Context -Share your session understanding with peers. Use share_context after exploring a codebase area. Check get_context before re-reading files another peer already analyzed. - -## Tasks -Create and claim work items. create_task to propose work, claim_task to take ownership, complete_task when done. Prevents duplicate effort. - -## Priority -- "now": interrupt immediately, even if recipient is in DND (use for urgent: broken deploy, blocking issue) -- "next" (default): deliver when recipient goes idle (normal coordination) -- "low": pull-only via check_messages (FYI, non-blocking context) - -## Coordination -Call list_peers at session start to understand who is online, their roles, and what they are working on. If you are a group lead, gather input from members before responding to external requests — do not answer alone. If you are a member, contribute to your lead when asked. Use @group messages for team-wide questions, direct messages for 1:1 coordination. Set a meaningful summary so peers know your current focus. - -## Message Mode -Your message mode is "${messageMode}". -- push: messages arrive in real-time as channel notifications. Respond immediately. -- inbox: messages are held. You'll see "[inbox] New message from X" notifications. Call check_messages to read them. -- off: no message notifications. Use check_messages manually to poll.`, - }, + { name: "claudemesh", version: VERSION }, + { capabilities: { tools: {}, prompts: {}, resources: {} } }, ); - server.setRequestHandler(ListToolsRequestSchema, async () => ({ - tools: TOOLS, - })); + // Tools: empty. The CLI is the API; the model invokes it via Bash. + server.setRequestHandler(ListToolsRequestSchema, async () => ({ tools: [] })); - // --- MCP Prompts: expose mesh skills as slash commands --- + // Prompts: mesh-published skills surfaced as `/skill-name` slash commands. server.setRequestHandler(ListPromptsRequestSchema, async () => { - const client = allClients()[0]; - if (!client) return { prompts: [] }; - const skills = await client.listSkills(); - return { - prompts: skills.map((s) => ({ - name: s.name, - description: s.description, - arguments: [], - })), - }; + try { + const { status, body } = await daemonGet("/v1/skills"); + if (status !== 200) return { prompts: [] }; + const skills = (body?.skills as Array<{ name: string; description: string }> | undefined) ?? []; + return { prompts: skills.map((s) => ({ name: s.name, description: s.description, arguments: [] })) }; + } catch { return { prompts: [] }; } }); server.setRequestHandler(GetPromptRequestSchema, async (req) => { - const { name, arguments: promptArgs } = req.params; - const client = allClients()[0]; - if (!client) throw new Error("Not connected to any mesh"); - const skill = await client.getSkill(name); - if (!skill) throw new Error(`Skill "${name}" not found in the mesh`); - - // Build the prompt content — include frontmatter if manifest has metadata + const name = req.params.name; + const { status, body } = await daemonGet(`/v1/skills/${encodeURIComponent(name)}`); + if (status === 404) throw new Error(`Skill "${name}" not found in the mesh`); + if (status !== 200) throw new Error(`daemon returned ${status} fetching skill`); + const skill = body.skill as { name: string; description: string; instructions: string; manifest?: any }; let content = skill.instructions; - const manifest = (skill as any).manifest; - if (manifest && typeof manifest === "object") { + const m = skill.manifest; + if (m && typeof m === "object") { const fm: string[] = ["---"]; - if (manifest.description) fm.push(`description: "${manifest.description}"`); - if (manifest.when_to_use) fm.push(`when_to_use: "${manifest.when_to_use}"`); - if (manifest.allowed_tools?.length) fm.push(`allowed-tools:\n${manifest.allowed_tools.map((t: string) => ` - ${t}`).join("\n")}`); - if (manifest.model) fm.push(`model: ${manifest.model}`); - if (manifest.context) fm.push(`context: ${manifest.context}`); - if (manifest.agent) fm.push(`agent: ${manifest.agent}`); - if (manifest.user_invocable === false) fm.push(`user-invocable: false`); - if (manifest.argument_hint) fm.push(`argument-hint: "${manifest.argument_hint}"`); + if (m.description) fm.push(`description: "${m.description}"`); + if (m.when_to_use) fm.push(`when_to_use: "${m.when_to_use}"`); + if (Array.isArray(m.allowed_tools) && m.allowed_tools.length) { + fm.push(`allowed-tools:\n${m.allowed_tools.map((t: string) => ` - ${t}`).join("\n")}`); + } + if (m.model) fm.push(`model: ${m.model}`); + if (m.context) fm.push(`context: ${m.context}`); + if (m.agent) fm.push(`agent: ${m.agent}`); + if (m.user_invocable === false) fm.push(`user-invocable: false`); + if (m.argument_hint) fm.push(`argument-hint: "${m.argument_hint}"`); fm.push("---\n"); if (fm.length > 3) content = fm.join("\n") + content; - - // Enforce context:fork via Agent tool instruction — Claude Code's MCP prompts - // path doesn't support the context field natively, so we instruct the model. - if (manifest.context === "fork") { - const agentType = manifest.agent || "general-purpose"; - const modelHint = manifest.model ? `, model: "${manifest.model}"` : ""; - const toolsHint = manifest.allowed_tools?.length - ? `\nOnly use these tools: ${manifest.allowed_tools.join(", ")}.` + if (m.context === "fork") { + const agentType = m.agent || "general-purpose"; + const modelHint = m.model ? `, model: "${m.model}"` : ""; + const toolsHint = m.allowed_tools?.length + ? `\nOnly use these tools: ${m.allowed_tools.join(", ")}.` : ""; content = `IMPORTANT: Execute this skill in an isolated sub-agent. Use the Agent tool with subagent_type="${agentType}"${modelHint}. Pass the full instructions below as the agent prompt.${toolsHint}\n\n` + content; } } - return { description: skill.description, - messages: [ - { - role: "user" as const, - content: { type: "text" as const, text: content }, - }, - ], + messages: [{ role: "user" as const, content: { type: "text" as const, text: content } }], }; }); - // --- MCP Resources: expose mesh skills as skill:// resources --- + // Resources: mesh skills as `skill://claudemesh/` URIs. server.setRequestHandler(ListResourcesRequestSchema, async () => { - const client = allClients()[0]; - if (!client) return { resources: [] }; - const skills = await client.listSkills(); - return { - resources: skills.map((s) => ({ - uri: `skill://claudemesh/${encodeURIComponent(s.name)}`, - name: s.name, - description: s.description, - mimeType: "text/markdown", - })), - }; + try { + const { body } = await daemonGet("/v1/skills"); + const skills = (body?.skills as Array<{ name: string; description: string }> | undefined) ?? []; + return { + resources: skills.map((s) => ({ + uri: `skill://claudemesh/${encodeURIComponent(s.name)}`, + name: s.name, + description: s.description, + mimeType: "text/markdown", + })), + }; + } catch { return { resources: [] }; } }); server.setRequestHandler(ReadResourceRequestSchema, async (req) => { - const { uri } = req.params; - // Parse skill://claudemesh/{name} - const match = uri.match(/^skill:\/\/claudemesh\/(.+)$/); - if (!match) throw new Error(`Unknown resource URI: ${uri}`); - const name = decodeURIComponent(match[1]!); - const client = allClients()[0]; - if (!client) throw new Error("Not connected to any mesh"); - const skill = await client.getSkill(name); - if (!skill) throw new Error(`Skill "${name}" not found`); - - // Build full markdown with frontmatter for Claude Code's parseSkillFrontmatterFields - const manifest = (skill as any).manifest; - const fmLines: string[] = ["---"]; - fmLines.push(`name: ${skill.name}`); - fmLines.push(`description: "${skill.description}"`); - if (skill.tags.length) fmLines.push(`tags: [${skill.tags.join(", ")}]`); - if (manifest && typeof manifest === "object") { - if (manifest.when_to_use) fmLines.push(`when_to_use: "${manifest.when_to_use}"`); - if (manifest.allowed_tools?.length) fmLines.push(`allowed-tools:\n${manifest.allowed_tools.map((t: string) => ` - ${t}`).join("\n")}`); - if (manifest.model) fmLines.push(`model: ${manifest.model}`); - if (manifest.context) fmLines.push(`context: ${manifest.context}`); - if (manifest.agent) fmLines.push(`agent: ${manifest.agent}`); - if (manifest.user_invocable === false) fmLines.push(`user-invocable: false`); - if (manifest.argument_hint) fmLines.push(`argument-hint: "${manifest.argument_hint}"`); - } - fmLines.push("---\n"); - - const fullContent = fmLines.join("\n") + skill.instructions; - - return { - contents: [ - { - uri, - mimeType: "text/markdown", - text: fullContent, - }, - ], + const uri = req.params.uri; + const m = uri.match(/^skill:\/\/claudemesh\/(.+)$/); + if (!m) throw new Error(`Unknown resource URI: ${uri}`); + const name = decodeURIComponent(m[1]!); + const { status, body } = await daemonGet(`/v1/skills/${encodeURIComponent(name)}`); + if (status === 404) throw new Error(`Skill "${name}" not found`); + if (status !== 200) throw new Error(`daemon returned ${status} fetching skill`); + const skill = body.skill as { + name: string; description: string; instructions: string; + tags?: string[]; manifest?: any; }; - }); - - - // Start MCP transport IMMEDIATELY so Claude Code discovers tools/prompts/resources - // without waiting for WS connections. Tool handlers gracefully return errors when - // not connected. WS connects in background; push wiring happens once ready. - const transport = new StdioServerTransport(); - await server.connect(transport); - - // Bridge servers — one Unix socket per connected mesh so CLI invocations - // can reuse this push-pipe's warm WS instead of opening their own - // (~5ms warm vs ~300-700ms cold). See spec 2026-05-02 commitment #3. - const bridges: BridgeServer[] = []; - - // Connect to broker WS in background — don't block MCP startup. - startClients(config).then(() => { - wirePushHandlers().catch(() => {}); - // Start one bridge socket per connected mesh. Done after WS connect so - // the BrokerClient is in a usable state when CLI requests arrive. - for (const client of allClients()) { - const bridge = startBridgeServer(client); - if (bridge) bridges.push(bridge); + const fm: string[] = ["---"]; + fm.push(`name: ${skill.name}`); + fm.push(`description: "${skill.description}"`); + if (skill.tags?.length) fm.push(`tags: [${skill.tags.join(", ")}]`); + const mf = skill.manifest; + if (mf && typeof mf === "object") { + if (mf.when_to_use) fm.push(`when_to_use: "${mf.when_to_use}"`); + if (Array.isArray(mf.allowed_tools) && mf.allowed_tools.length) { + fm.push(`allowed-tools:\n${mf.allowed_tools.map((t: string) => ` - ${t}`).join("\n")}`); + } + if (mf.model) fm.push(`model: ${mf.model}`); + if (mf.context) fm.push(`context: ${mf.context}`); } - }).catch(() => { - // Connect failed — clients are in reconnecting state, push wiring still needed - wirePushHandlers().catch(() => {}); + fm.push("---\n"); + return { contents: [{ uri, mimeType: "text/markdown", text: fm.join("\n") + skill.instructions }] }; }); - async function wirePushHandlers() { - // Wire WSS pushes → MCP channel notifications. Each inbound push on - // any mesh's broker connection becomes a - // system reminder injected into Claude Code's context. - for (const client of allClients()) { - // Event-driven push: WS onPush fires immediately when a message arrives. - // Claude Code's setNotificationHandler → enqueue → React useEffect pipeline - // processes notifications instantly (no polling needed on Claude's side). - // The old poll-based approach was an overcorrection — Claude Code source - // confirms event-driven notification processing. - client.onPush(async (msg) => { - if (messageMode === "off") return; - - // System events (peer join/leave) — always push, regardless of mode. - if (msg.subtype === "system" && msg.event) { - const eventName = msg.event; - const data = msg.eventData ?? {}; - let content: string; - if (eventName === "tick") { - const tick = data.tick ?? 0; - const simTime = String(data.simTime ?? "").replace("T", " ").replace(/\..*/,""); - const speed = data.speed ?? 1; - content = `[heartbeat] tick ${tick} | sim time: ${simTime} | speed: x${speed}`; - } else if (eventName === "peer_joined") { - content = `[system] Peer "${data.name ?? "unknown"}" joined the mesh`; - } else if (eventName === "peer_returned") { - const peerName = String(data.name ?? "unknown"); - const lastSeenAt = data.lastSeenAt ? relativeTime(String(data.lastSeenAt)) : "unknown"; - const groups = Array.isArray(data.groups) - ? (data.groups as Array<{ name: string; role?: string }>).map((g) => g.role ? `@${g.name}:${g.role}` : `@${g.name}`).join(", ") - : ""; - const summary = data.summary ? ` Summary: "${data.summary}"` : ""; - content = `[system] Welcome back, "${peerName}"! Last seen ${lastSeenAt}.${groups ? ` Restored: ${groups}` : ""}${summary}`; - } else if (eventName === "peer_left") { - content = `[system] Peer "${data.name ?? "unknown"}" left the mesh`; - } else if (eventName === "mcp_registered") { - const tools = Array.isArray(data.tools) ? (data.tools as string[]).join(", ") : ""; - content = `[system] New MCP server available: "${data.serverName}" (hosted by ${data.hostedBy}). Tools: ${tools}. Use mesh_tool_call to invoke.`; - } else if (eventName === "mcp_unregistered") { - content = `[system] MCP server "${data.serverName}" removed (was hosted by ${data.hostedBy})`; - } else if (eventName === "mcp_restored") { - content = `[system] MCP server "${data.serverName}" is back online (hosted by ${data.hostedBy})`; - } else if (eventName === "watch_triggered") { - content = `[WATCH] ${data.label ?? data.url}: ${data.oldValue} → ${data.newValue}`; - } else if (eventName === "mcp_deployed") { - content = `[SERVICE] "${data.name}" deployed (${data.tool_count} tools) by ${data.deployed_by}`; - } else if (eventName === "mcp_undeployed") { - content = `[SERVICE] "${data.name}" undeployed by ${data.by}`; - } else if (eventName === "mcp_scope_changed") { - content = `[SERVICE] "${data.name}" scope changed to ${JSON.stringify(data.scope)} by ${data.by}`; - } else { - content = `[system] ${eventName}: ${JSON.stringify(data)}`; - } - try { - await server.notification({ - method: "notifications/claude/channel", - params: { - content, - meta: { - kind: "system", - event: eventName, - mesh_slug: client.meshSlug, - mesh_id: client.meshId, - ...(Object.keys(data).length > 0 ? { eventData: JSON.stringify(data) } : {}), - }, - }, - }); - process.stderr.write(`[claudemesh] system: ${content}\n`); - } catch (pushErr) { - process.stderr.write(`[claudemesh] system push FAILED: ${pushErr}\n`); - } - return; - } - - const fromPubkey = msg.senderPubkey || ""; - const fromName = fromPubkey - ? await resolvePeerName(client, fromPubkey) - : "unknown"; - - // Per-peer capability check — drop silently if sender lacks `dm`. - if (fromPubkey) { - try { - const { isAllowed } = await import("~/commands/grants.js"); - const kindCap = msg.kind === "broadcast" ? "broadcast" : "dm"; - if (!isAllowed(client.meshSlug, fromPubkey, kindCap)) { - process.stderr.write(`[claudemesh] dropped ${kindCap} from ${fromName} (not granted)\n`); - return; - } - } catch { /* fail-open on grant-read errors — don't break delivery */ } - } - - if (messageMode === "inbox") { - try { - await server.notification({ - method: "notifications/claude/channel", - params: { - content: `[inbox] New message from ${fromName}. Use check_messages to read.`, - meta: { kind: "inbox_notification", from_name: fromName }, - }, - }); - } catch { /* best effort */ } - return; - } - - // push mode — full content. Format the content so it reads as a - // first-class chat message even though Claude Code renders it as a - // reminder: sender attribution + priority badge + body. - const body = msg.plaintext ?? decryptFailedWarning(fromPubkey); - const prioBadge = msg.priority === "now" ? "[URGENT] " : msg.priority === "low" ? "[low] " : ""; - const kindBadge = msg.kind === "broadcast" ? " (broadcast)" : ""; - const content = `${prioBadge}${fromName}${kindBadge}: ${body}`; - // `from_id` MUST be a stable replyable id. Older clients of this - // channel have been pasting from_id straight back into - // `claudemesh send `; if from_id is the SESSION pubkey it - // bounces with "no connected peer" the moment the sender's - // session restarts. Send the MEMBER pubkey (stable across - // reconnects) as from_id, and keep the ephemeral session pubkey - // available under from_session_pubkey for crypto-aware callers. - const fromMemberPubkey = msg.senderMemberPubkey ?? fromPubkey; + // Subscribe to daemon events; translate to channel notifications. + const sub = subscribeEvents(async (ev) => { + if (ev.kind === "message") { + const d = ev.data; + const fromName = String(d.sender_name ?? "unknown"); + const fromMember = String(d.sender_member_pubkey ?? d.sender_pubkey ?? ""); + const body = String(d.body ?? "(decrypt failed)"); + const priority = String(d.priority ?? "next"); + const prioBadge = priority === "now" ? "[URGENT] " : priority === "low" ? "[low] " : ""; + const topicTag = d.topic ? ` (#${d.topic})` : ""; + const content = `${prioBadge}${fromName}${topicTag}: ${body}`; try { await server.notification({ method: "notifications/claude/channel", params: { content, meta: { - from_id: fromMemberPubkey, - from_pubkey: fromMemberPubkey, - from_session_pubkey: fromPubkey, + from_id: fromMember, + from_pubkey: fromMember, + from_session_pubkey: String(d.sender_pubkey ?? ""), from_name: fromName, - ...(msg.senderMemberId ? { from_member_id: msg.senderMemberId } : {}), - mesh_slug: client.meshSlug, - mesh_id: client.meshId, - priority: msg.priority, - sent_at: msg.createdAt, - delivered_at: msg.receivedAt, - kind: msg.kind, - message_id: msg.messageId, - ...(msg.topic ? { topic: msg.topic } : {}), - ...(msg.replyToId ? { reply_to_id: msg.replyToId } : {}), - ...(msg.subtype ? { subtype: msg.subtype } : {}), + mesh_slug: String(d.mesh ?? ""), + priority, + message_id: String(d.broker_message_id ?? d.id ?? ""), + client_message_id: String(d.client_message_id ?? ""), + ...(d.topic ? { topic: String(d.topic) } : {}), + ...(d.reply_to_id ? { reply_to_id: String(d.reply_to_id) } : {}), + ...(d.subtype ? { subtype: String(d.subtype) } : {}), }, }, }); - process.stderr.write(`[claudemesh] pushed: from=${fromName} content=${body.slice(0, 60)}\n`); - } catch (pushErr) { - process.stderr.write(`[claudemesh] push FAILED: ${pushErr}\n`); + } catch (err) { + process.stderr.write(`[claudemesh-mcp] channel emit failed: ${err}\n`); + } + } else if (ev.kind === "peer_join" || ev.kind === "peer_leave" || ev.kind === "system") { + const d = ev.data; + const eventName = String(d.event ?? ev.kind); + let content: string; + if (ev.kind === "peer_join") { + content = `[system] Peer "${String(d.name ?? "unknown")}" joined the mesh`; + } else if (ev.kind === "peer_leave") { + content = `[system] Peer "${String(d.name ?? "unknown")}" left the mesh`; + } else { + content = `[system] ${eventName}: ${JSON.stringify(d).slice(0, 240)}`; } - }); - - client.onStreamData(async (evt) => { try { await server.notification({ method: "notifications/claude/channel", params: { - content: `[stream:${evt.stream}] from ${evt.publishedBy}: ${JSON.stringify(evt.data)}`, + content, meta: { - kind: "stream_data", - stream: evt.stream, - published_by: evt.publishedBy, + kind: "system", + event: eventName, + mesh_slug: String(d.mesh ?? ""), }, }, }); } catch { /* best effort */ } - }); - - client.onStateChange(async (change) => { - try { - await server.notification({ - method: "notifications/claude/channel", - params: { - content: `[state] ${change.key} = ${JSON.stringify(change.value)} (set by ${change.updatedBy})`, - meta: { - kind: "state_change", - key: change.key, - updated_by: change.updatedBy, - }, - }, - }); - } catch { /* best effort */ } - }); } + }); - // Welcome notification: give Claude immediate context on connect. - // Delay slightly to ensure Claude Code has completed MCP initialization - // handshake (notifications/initialized) before we push channel messages. - setTimeout(async () => { - const welcomeClient = allClients()[0]; - if (!welcomeClient || welcomeClient.status !== "open") return; - try { - const peers = await welcomeClient.listPeers(); - const peerNames = peers - .filter(p => p.displayName !== myName) - .map(p => p.displayName) - .join(", ") || "none"; - await server.notification({ - method: "notifications/claude/channel", - params: { - content: `[system] Connected as ${myName} to mesh ${welcomeClient.meshSlug}. ${peers.length} peer(s) online: ${peerNames}. Call mesh_info for full details or set_summary to announce yourself.`, - meta: { kind: "welcome", mesh_slug: welcomeClient.meshSlug }, - }, - }); - } catch { /* best effort */ } - }, 2_000); - } // end wirePushHandlers + const transport = new StdioServerTransport(); + await server.connect(transport); - // Event loop keepalive: Node.js stdout to a pipe is buffered. Without - // periodic event loop activity, stdout.write() from WS callbacks may not - // flush until the next I/O event. This 1s interval keeps the event loop - // ticking so channel notifications flush promptly — same pattern that made - // claude-intercom's push delivery reliable (its 1s HTTP poll had this - // effect as a side effect). The interval does nothing except prevent the - // event loop from settling. - const keepalive = setInterval(() => { - // Intentionally empty — the interval itself keeps the event loop active. - // Do NOT call .unref() — that would defeat the purpose. - }, 1_000); - void keepalive; // suppress unused warning + // Keep event loop active so SSE callbacks flush stdout promptly. + const keepalive = setInterval(() => { /* tick */ }, 1_000); + void keepalive; const shutdown = (): void => { clearInterval(keepalive); - for (const b of bridges) { - try { b.stop(); } catch {} - } - stopAll(); + sub.close(); process.exit(0); }; process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); } +// ── mesh-service proxy mode (unchanged from prior versions) ──────────── + /** * Mesh service proxy — a thin MCP server that proxies ONE deployed service. * @@ -830,33 +374,22 @@ async function startServiceProxy(serviceName: string): Promise { process.exit(1); } - // Wait for hello_ack and service catalog + // Wait for hello_ack and service catalog. await new Promise((r) => setTimeout(r, 1500)); - // Fetch tool schemas for this service - let tools: Array<{ - name: string; - description: string; - inputSchema: Record; - }> = []; + let tools: Array<{ name: string; description: string; inputSchema: Record }> = []; try { const fetched = await client.getServiceTools(serviceName); tools = fetched as typeof tools; } catch { - // Try from catalog cache const cached = client.serviceCatalog.find((s) => s.name === serviceName); - if (cached) { - tools = cached.tools as typeof tools; - } + if (cached) tools = cached.tools as typeof tools; } if (tools.length === 0) { - process.stderr.write( - `[mesh:${serviceName}] no tools found — service may not be running\n`, - ); + process.stderr.write(`[mesh:${serviceName}] no tools found — service may not be running\n`); } - // Build MCP server const server = new Server( { name: `mesh:${serviceName}`, version: "0.1.0" }, { capabilities: { tools: {} } }, @@ -874,7 +407,6 @@ async function startServiceProxy(serviceName: string): Promise { const toolName = req.params.name; const args = req.params.arguments ?? {}; - // Wait for broker reconnection if needed if ((client.status as string) !== "open") { let waited = 0; while ((client.status as string) !== "open" && waited < 10_000) { @@ -883,92 +415,50 @@ async function startServiceProxy(serviceName: string): Promise { } if ((client.status as string) !== "open") { return { - content: [ - { - type: "text" as const, - text: `Service temporarily unavailable — broker reconnecting. Retry in a few seconds.`, - }, - ], + content: [{ type: "text" as const, text: "Service temporarily unavailable — broker reconnecting. Retry in a few seconds." }], isError: true, }; } } try { - const result = await client.mcpCall( - serviceName, - toolName, - args as Record, - ); + const result = await client.mcpCall(serviceName, toolName, args as Record); if (result.error) { - return { - content: [{ type: "text" as const, text: `Error: ${result.error}` }], - isError: true, - }; + return { content: [{ type: "text" as const, text: `Error: ${result.error}` }], isError: true }; } - const resultText = - typeof result.result === "string" - ? result.result - : JSON.stringify(result.result, null, 2); - return { - content: [{ type: "text" as const, text: resultText }], - }; + const resultText = typeof result.result === "string" + ? result.result + : JSON.stringify(result.result, null, 2); + return { content: [{ type: "text" as const, text: resultText }] }; } catch (e) { return { - content: [ - { - type: "text" as const, - text: `Call failed: ${e instanceof Error ? e.message : String(e)}`, - }, - ], + content: [{ type: "text" as const, text: `Call failed: ${e instanceof Error ? e.message : String(e)}` }], isError: true, }; } }); - // Listen for service events (undeploy, update) client.onPush((push) => { - if ( - push.event === "mcp_undeployed" && - (push.eventData as any)?.name === serviceName - ) { - process.stderr.write( - `[mesh:${serviceName}] service undeployed — exiting\n`, - ); + if (push.event === "mcp_undeployed" && (push.eventData as any)?.name === serviceName) { + process.stderr.write(`[mesh:${serviceName}] service undeployed — exiting\n`); client.close(); process.exit(0); } - if ( - push.event === "mcp_updated" && - (push.eventData as any)?.name === serviceName - ) { - // Refresh tools + if (push.event === "mcp_updated" && (push.eventData as any)?.name === serviceName) { const newTools = (push.eventData as any)?.tools; if (Array.isArray(newTools)) { tools = newTools as typeof tools; - // Notify Claude Code that tools changed - server - .notification({ - method: "notifications/tools/list_changed", - }) - .catch(() => { - /* ignore notification errors */ - }); + server.notification({ method: "notifications/tools/list_changed" }).catch(() => { /* ignore */ }); } } }); - // Start stdio transport const transport = new StdioServerTransport(); await server.connect(transport); - // Keep event loop alive - const keepalive = setInterval(() => { - // Intentionally empty — prevents event loop from settling. - }, 1_000); + const keepalive = setInterval(() => { /* tick */ }, 1_000); void keepalive; - // Graceful shutdown const shutdown = (): void => { clearInterval(keepalive); client.close();