feat(cli): 1.28.0 — bridge deletion + daemon-policy flags
drop the orphaned bridge tier (~600 LoC). client/server/protocol files deleted; tryBridge had returned null in production for seven releases since the 1.24.0 mcp shim rewrite stopped opening the sockets. each verb now has two paths: daemon (with 1.27.3's auto-spawn) → cold ws. add per-process daemon policy: --strict (error instead of cold fallback) and --no-daemon (skip daemon entirely). enforcement at withMesh so a single chokepoint covers every verb. env equivalents CLAUDEMESH_STRICT_DAEMON / CLAUDEMESH_NO_DAEMON. flag wins. net -394 loc; the daemon-up case ships ~600 loc lighter and the fallback story is one tier simpler. first sprint A drop; per-session ipc tokens and the wizard refactors follow in 1.29.0+. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -15,8 +15,6 @@
|
||||
*/
|
||||
|
||||
import { withMesh } from "./connect.js";
|
||||
import { readConfig } from "~/services/config/facade.js";
|
||||
import { tryBridge } from "~/services/bridge/client.js";
|
||||
import { tryForgetViaDaemon } from "~/services/bridge/daemon-route.js";
|
||||
import { render } from "~/ui/render.js";
|
||||
import { bold, clay, dim } from "~/ui/styles.js";
|
||||
@@ -26,14 +24,6 @@ import { validateMessageId, renderValidationError } from "~/cli/validators.js";
|
||||
type StateFlags = { mesh?: string; json?: boolean };
|
||||
type PeerStatus = "idle" | "working" | "dnd";
|
||||
|
||||
/** Resolve unambiguous mesh slug for warm-path bridging. Returns null if
|
||||
* the user has multiple joined meshes and didn't pick one. */
|
||||
function unambiguousMesh(opts: StateFlags): string | null {
|
||||
if (opts.mesh) return opts.mesh;
|
||||
const config = readConfig();
|
||||
return config.meshes.length === 1 ? config.meshes[0]!.slug : null;
|
||||
}
|
||||
|
||||
// --- status ---
|
||||
|
||||
export async function runStatusSet(state: string, opts: StateFlags): Promise<number> {
|
||||
@@ -43,21 +33,9 @@ export async function runStatusSet(state: string, opts: StateFlags): Promise<num
|
||||
return EXIT.INVALID_ARGS;
|
||||
}
|
||||
|
||||
// Warm path
|
||||
const meshSlug = unambiguousMesh(opts);
|
||||
if (meshSlug) {
|
||||
const bridged = await tryBridge(meshSlug, "status_set", { status: state });
|
||||
if (bridged !== null) {
|
||||
if (bridged.ok) {
|
||||
if (opts.json) console.log(JSON.stringify({ status: state }));
|
||||
else render.ok(`status set to ${bold(state)}`);
|
||||
return EXIT.SUCCESS;
|
||||
}
|
||||
render.err(bridged.error);
|
||||
return EXIT.INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
// Bridge tier deleted in 1.28.0 (dead code; the orphaned warm-path
|
||||
// socket was never opened by anyone). Daemon route would belong here;
|
||||
// adding it for status/summary/visible is queued for 1.29.0.
|
||||
await withMesh({ meshSlug: opts.mesh ?? null }, async (client) => {
|
||||
await client.setStatus(state as PeerStatus);
|
||||
});
|
||||
@@ -74,21 +52,6 @@ export async function runSummary(text: string, opts: StateFlags): Promise<number
|
||||
return EXIT.INVALID_ARGS;
|
||||
}
|
||||
|
||||
// Warm path
|
||||
const meshSlug = unambiguousMesh(opts);
|
||||
if (meshSlug) {
|
||||
const bridged = await tryBridge(meshSlug, "summary", { summary: text });
|
||||
if (bridged !== null) {
|
||||
if (bridged.ok) {
|
||||
if (opts.json) console.log(JSON.stringify({ summary: text }));
|
||||
else render.ok("summary set", dim(text));
|
||||
return EXIT.SUCCESS;
|
||||
}
|
||||
render.err(bridged.error);
|
||||
return EXIT.INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
await withMesh({ meshSlug: opts.mesh ?? null }, async (client) => {
|
||||
await client.setSummary(text);
|
||||
});
|
||||
@@ -108,21 +71,6 @@ export async function runVisible(value: string | undefined, opts: StateFlags): P
|
||||
return EXIT.INVALID_ARGS;
|
||||
}
|
||||
|
||||
// Warm path
|
||||
const meshSlug = unambiguousMesh(opts);
|
||||
if (meshSlug) {
|
||||
const bridged = await tryBridge(meshSlug, "visible", { visible });
|
||||
if (bridged !== null) {
|
||||
if (bridged.ok) {
|
||||
if (opts.json) console.log(JSON.stringify({ visible }));
|
||||
else render.ok(visible ? "you are now visible to peers" : "you are now hidden", visible ? undefined : "direct messages still reach you");
|
||||
return EXIT.SUCCESS;
|
||||
}
|
||||
render.err(bridged.error);
|
||||
return EXIT.INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
await withMesh({ meshSlug: opts.mesh ?? null }, async (client) => {
|
||||
await client.setVisible(visible);
|
||||
});
|
||||
|
||||
@@ -10,6 +10,7 @@ import { createInterface } from "node:readline";
|
||||
import { BrokerClient } from "~/services/broker/facade.js";
|
||||
import { readConfig } from "~/services/config/facade.js";
|
||||
import type { JoinedMesh } from "~/services/config/facade.js";
|
||||
import { getDaemonPolicy } from "~/services/daemon/policy.js";
|
||||
|
||||
export interface ConnectOpts {
|
||||
/** Mesh slug to connect to. Auto-selects if only one mesh joined. */
|
||||
@@ -46,6 +47,18 @@ export async function withMesh<T>(
|
||||
opts: ConnectOpts,
|
||||
fn: (client: BrokerClient, mesh: JoinedMesh) => Promise<T>,
|
||||
): Promise<T> {
|
||||
// --strict gate: every cold-path verb funnels through here, so a single
|
||||
// policy check covers the whole CLI surface. The daemon-routing helpers
|
||||
// already returned null (auto-spawn failed); under --strict we refuse
|
||||
// the cold-path fallback and exit loudly instead.
|
||||
if (getDaemonPolicy().mode === "strict") {
|
||||
console.error(
|
||||
"\n ✘ daemon not reachable — --strict refuses cold-path fallback.\n" +
|
||||
" run `claudemesh daemon up` (or `claudemesh doctor`) and retry.\n",
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = readConfig();
|
||||
if (config.meshes.length === 0) {
|
||||
console.error("No meshes joined. Run `claudemesh join <url>` first.");
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
import { withMesh } from "./connect.js";
|
||||
import { readConfig } from "~/services/config/facade.js";
|
||||
import { tryBridge } from "~/services/bridge/client.js";
|
||||
import { render } from "~/ui/render.js";
|
||||
import { bold, dim, green, yellow } from "~/ui/styles.js";
|
||||
|
||||
@@ -68,7 +67,10 @@ async function listPeersForMesh(slug: string): Promise<PeerRecord[]> {
|
||||
const selfMemberPubkey = joined?.pubkey ?? null;
|
||||
|
||||
// Daemon path — preferred when running. Same routing pattern as send.ts:
|
||||
// ~1 ms IPC round-trip; broker WS already warm in the daemon.
|
||||
// ~1 ms IPC round-trip; broker WS already warm in the daemon. The
|
||||
// lifecycle helper inside tryListPeersViaDaemon auto-spawns the
|
||||
// daemon if it's down and probes it for liveness — no separate bridge
|
||||
// tier is needed any more (1.28.0).
|
||||
try {
|
||||
const { tryListPeersViaDaemon } = await import("~/services/bridge/daemon-route.js");
|
||||
const dr = await tryListPeersViaDaemon();
|
||||
@@ -77,13 +79,8 @@ async function listPeersForMesh(slug: string): Promise<PeerRecord[]> {
|
||||
}
|
||||
} catch { /* daemon route helper not available; fall through */ }
|
||||
|
||||
// Try warm bridge path next.
|
||||
const bridged = await tryBridge(slug, "peers");
|
||||
if (bridged && bridged.ok) {
|
||||
const peers = bridged.result as PeerRecord[];
|
||||
return peers.map((p) => annotateSelf(p, selfMemberPubkey, null));
|
||||
}
|
||||
// Cold path — open our own WS.
|
||||
// Cold path — open our own WS. Reached only when the lifecycle helper
|
||||
// could not bring the daemon up.
|
||||
let result: PeerRecord[] = [];
|
||||
await withMesh({ meshSlug: slug }, async (client) => {
|
||||
const all = (await client.listPeers()) as unknown as PeerRecord[];
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
|
||||
import { withMesh } from "./connect.js";
|
||||
import { readConfig } from "~/services/config/facade.js";
|
||||
import { tryBridge } from "~/services/bridge/client.js";
|
||||
import { trySendViaDaemon } from "~/services/bridge/daemon-route.js";
|
||||
import type { Priority } from "~/services/broker/facade.js";
|
||||
import { render } from "~/ui/render.js";
|
||||
@@ -82,34 +81,12 @@ export async function runSend(flags: SendFlags, to: string, message: string): Pr
|
||||
else render.err(`send failed (daemon): ${dr.error}`);
|
||||
process.exit(1);
|
||||
}
|
||||
// dr === null → daemon not running; fall through to bridge.
|
||||
// dr === null → daemon not running and lifecycle couldn't auto-
|
||||
// spawn it; fall through to cold path. The orphaned bridge tier
|
||||
// was removed in 1.28.0.
|
||||
}
|
||||
|
||||
// Warm path — only when mesh is unambiguous.
|
||||
if (meshSlug) {
|
||||
const bridged = await tryBridge(meshSlug, "send", { to, message, priority });
|
||||
if (bridged !== null) {
|
||||
if (bridged.ok) {
|
||||
const r = bridged.result as { messageId?: string };
|
||||
if (flags.json) {
|
||||
console.log(JSON.stringify({ ok: true, messageId: r.messageId, target: to }));
|
||||
} else {
|
||||
render.ok(`sent to ${to}`, r.messageId ? dim(r.messageId.slice(0, 8)) : undefined);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// Bridge reachable but op failed — surface error, don't fall through.
|
||||
if (flags.json) {
|
||||
console.log(JSON.stringify({ ok: false, error: bridged.error }));
|
||||
} else {
|
||||
render.err(`send failed: ${bridged.error}`);
|
||||
}
|
||||
process.exit(1);
|
||||
}
|
||||
// bridged === null → bridge unreachable, fall through to cold path
|
||||
}
|
||||
|
||||
// Cold path
|
||||
// Cold path — open our own WS, encrypt locally, fire envelope.
|
||||
await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => {
|
||||
let targetSpec = to;
|
||||
if (to.startsWith("#") && !/^#[0-9a-z_-]{20,}$/i.test(to)) {
|
||||
|
||||
@@ -9,6 +9,7 @@ import { renderVersion } from "~/cli/output/version.js";
|
||||
import { isInviteUrl, normaliseInviteUrl } from "~/utils/url.js";
|
||||
import { classifyInvocation } from "~/cli/policy-classify.js";
|
||||
import { gate, type ApprovalMode } from "~/services/policy/index.js";
|
||||
import { setDaemonPolicy, policyFromFlags } from "~/services/daemon/policy.js";
|
||||
import { bold, clay, cyan, dim, orange } from "~/ui/styles.js";
|
||||
|
||||
installSignalHandlers();
|
||||
@@ -16,6 +17,11 @@ installErrorHandlers();
|
||||
|
||||
const { command, positionals, flags } = parseArgv(process.argv);
|
||||
|
||||
// Resolve daemon policy once at boot — daemon-routing helpers read this
|
||||
// instead of inspecting flags themselves. --no-daemon and --strict are
|
||||
// mutually exclusive (--no-daemon wins if both are passed).
|
||||
setDaemonPolicy(policyFromFlags(flags));
|
||||
|
||||
/**
|
||||
* Resolve the coarse approval mode from CLI flags + env.
|
||||
* --approval-mode <plan|read-only|write|yolo> explicit
|
||||
@@ -210,6 +216,8 @@ Flags
|
||||
--policy <path> override policy file
|
||||
-y, --yes skip confirmations (= --approval-mode yolo)
|
||||
-q, --quiet suppress non-essential output
|
||||
--strict require daemon for broker-touching verbs (no cold-path fallback)
|
||||
--no-daemon skip daemon entirely; open broker WS directly (CI / sandboxed scripts)
|
||||
`;
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,114 +0,0 @@
|
||||
/**
|
||||
* Bridge client — CLI invocations dial the per-mesh Unix socket the
|
||||
* MCP push-pipe holds open, so they reuse its warm WS instead of opening
|
||||
* a fresh one (~5ms vs ~300-700ms).
|
||||
*
|
||||
* Usage from a command:
|
||||
*
|
||||
* const result = await tryBridge(meshSlug, "send", { to, message });
|
||||
* if (result === null) { ...fall through to cold withMesh()... }
|
||||
* else { ...warm path succeeded... }
|
||||
*
|
||||
* `tryBridge` returns null on:
|
||||
* - socket file absent (no push-pipe running)
|
||||
* - socket connect fails (push-pipe crashed without cleanup)
|
||||
* - bridge timeout
|
||||
* That null is the caller's signal to fall back to a cold WS connection
|
||||
* via `withMesh`. So the bridge is purely an optimization — every verb
|
||||
* still works without it.
|
||||
*/
|
||||
|
||||
import { createConnection } from "node:net";
|
||||
import { existsSync } from "node:fs";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
socketPath,
|
||||
frame,
|
||||
LineParser,
|
||||
type BridgeRequest,
|
||||
type BridgeResponse,
|
||||
type BridgeVerb,
|
||||
} from "./protocol.js";
|
||||
|
||||
const DEFAULT_TIMEOUT_MS = 5_000;
|
||||
|
||||
/**
|
||||
* Send one request and await the matching response. Returns:
|
||||
* - { ok: true, result } on success
|
||||
* - { ok: false, error } on bridge-reachable-but-broker-error
|
||||
* - null on bridge-unreachable (caller should fall back to cold WS)
|
||||
*/
|
||||
export async function tryBridge(
|
||||
meshSlug: string,
|
||||
verb: BridgeVerb,
|
||||
args: Record<string, unknown> = {},
|
||||
timeoutMs: number = DEFAULT_TIMEOUT_MS,
|
||||
): Promise<{ ok: true; result: unknown } | { ok: false; error: string } | null> {
|
||||
const path = socketPath(meshSlug);
|
||||
if (!existsSync(path)) return null;
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const id = randomUUID();
|
||||
const req: BridgeRequest = { id, verb, args };
|
||||
const parser = new LineParser();
|
||||
let settled = false;
|
||||
|
||||
const finish = (
|
||||
value: { ok: true; result: unknown } | { ok: false; error: string } | null,
|
||||
): void => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
try { socket.destroy(); } catch {}
|
||||
clearTimeout(timer);
|
||||
resolve(value);
|
||||
};
|
||||
|
||||
const socket = createConnection({ path });
|
||||
|
||||
const timer = setTimeout(() => {
|
||||
finish(null); // timeout = bridge unreachable, fall back to cold path
|
||||
}, timeoutMs);
|
||||
|
||||
socket.on("connect", () => {
|
||||
try {
|
||||
socket.write(frame(req));
|
||||
} catch {
|
||||
finish(null);
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("data", (chunk) => {
|
||||
const lines = parser.feed(chunk);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
let res: BridgeResponse;
|
||||
try {
|
||||
res = JSON.parse(line) as BridgeResponse;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (res.id !== id) continue; // not our response — keep reading
|
||||
if (res.ok) finish({ ok: true, result: res.result });
|
||||
else finish({ ok: false, error: res.error });
|
||||
return;
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("error", (err) => {
|
||||
// ENOENT (file disappeared between existsSync and connect),
|
||||
// ECONNREFUSED (stale socket), EPERM (permission), etc. — all mean
|
||||
// bridge unreachable.
|
||||
const code = (err as NodeJS.ErrnoException).code;
|
||||
if (code === "ECONNREFUSED" || code === "ENOENT" || code === "EPERM") {
|
||||
finish(null);
|
||||
} else {
|
||||
finish(null);
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("close", () => {
|
||||
// If we close without a response, treat as unreachable.
|
||||
finish(null);
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
// Try forwarding a send through the local daemon's IPC. Returns null if
|
||||
// the daemon isn't running or the daemon's mesh doesn't match the target
|
||||
// mesh — the caller falls back to the bridge or cold path.
|
||||
// Daemon-routed CLI helpers. Returns null when the daemon is unreachable
|
||||
// AND auto-spawn could not bring it up — caller is expected to fall back
|
||||
// to its cold-path WS or to error out under `--strict`.
|
||||
//
|
||||
// Auto-recovery: when the daemon socket is missing or stale, every
|
||||
// helper here calls into the lifecycle module which probes, spawns
|
||||
@@ -8,9 +8,13 @@
|
||||
// fires if auto-spawn failed. The lifecycle module caches its
|
||||
// per-process result, so a script doing 50 sends pays the spawn cost
|
||||
// at most once.
|
||||
//
|
||||
// 1.28.0: the orphaned bridge tier between daemon and cold paths was
|
||||
// removed. Two paths only: daemon (with auto-spawn) → cold.
|
||||
|
||||
import { ipc } from "~/daemon/ipc/client.js";
|
||||
import { ensureDaemonReady } from "~/services/daemon/lifecycle.js";
|
||||
import { getDaemonPolicy } from "~/services/daemon/policy.js";
|
||||
import { warnDaemonState } from "~/ui/warnings.ts";
|
||||
|
||||
function meshQuery(mesh?: string): string {
|
||||
@@ -19,13 +23,15 @@ function meshQuery(mesh?: string): string {
|
||||
|
||||
/** Common entry: ensure the daemon is reachable, emitting a one-shot
|
||||
* stderr warning describing what we did. Returns true when the daemon
|
||||
* is now reachable, false when the caller should fall back. */
|
||||
* is now reachable, false when the caller should fall back.
|
||||
*
|
||||
* --no-daemon short-circuits to false; --strict's enforcement lives at
|
||||
* the cold-path entry point (`withMesh` in commands/connect.ts) so a
|
||||
* single chokepoint covers every verb. */
|
||||
async function daemonReachable(): Promise<boolean> {
|
||||
const res = await ensureDaemonReady();
|
||||
// Suppress the warning under JSON / quiet at the call site —
|
||||
// helpers here can't see those flags. JSON callers should switch
|
||||
// to lifecycle directly. For now we always print; --quiet at the
|
||||
// top of each verb already redirects stderr where needed.
|
||||
const policy = getDaemonPolicy();
|
||||
if (policy.mode === "no-daemon") return false;
|
||||
const res = await ensureDaemonReady({ noAutoSpawn: false });
|
||||
warnDaemonState(res, {});
|
||||
return res.state === "up" || res.state === "started";
|
||||
}
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
/**
|
||||
* Bridge protocol — wire format between the MCP push-pipe (server) and
|
||||
* CLI invocations (client) over a per-mesh Unix domain socket.
|
||||
*
|
||||
* Why: every CLI op should reuse the warm WS the push-pipe already holds
|
||||
* (~5ms) instead of opening its own (~300-700ms cold start). The bridge is
|
||||
* the load-bearing piece of the CLI-first architecture — see
|
||||
* .artifacts/specs/2026-05-02-architecture-north-star.md commitment #3.
|
||||
*
|
||||
* Wire format: line-delimited JSON. One JSON object per "\n"-terminated line.
|
||||
* Each request carries an `id` string; the response echoes it.
|
||||
*
|
||||
* Socket path: ~/.claudemesh/sockets/<mesh-slug>.sock (mode 0600).
|
||||
*
|
||||
* Connection model: persistent. A CLI invocation opens, sends one or more
|
||||
* requests, reads matching responses, then closes. Multiplexing via `id`
|
||||
* means concurrent CLI calls don't have to serialize on the same socket
|
||||
* (though current callers all do one round-trip and exit).
|
||||
*/
|
||||
|
||||
import { homedir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
export const PROTOCOL_VERSION = 1;
|
||||
|
||||
/** Socket path for a given mesh. Caller is responsible for ensuring the
|
||||
* parent directory exists (`~/.claudemesh/sockets/`). */
|
||||
export function socketPath(meshSlug: string): string {
|
||||
return join(homedir(), ".claudemesh", "sockets", `${meshSlug}.sock`);
|
||||
}
|
||||
|
||||
/** Directory holding all per-mesh sockets. Created with mode 0700 on push-pipe boot. */
|
||||
export function socketDir(): string {
|
||||
return join(homedir(), ".claudemesh", "sockets");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verbs the bridge accepts. Keep this list narrow in 1.2.0 — three writes
|
||||
* (send, summary, status), the read-shaped peers, plus ping for health.
|
||||
* Expand in 1.3.0 once the bridge is proven.
|
||||
*/
|
||||
export type BridgeVerb =
|
||||
| "ping"
|
||||
| "peers"
|
||||
| "send"
|
||||
| "summary"
|
||||
| "status_set"
|
||||
| "visible";
|
||||
|
||||
export interface BridgeRequest {
|
||||
id: string;
|
||||
verb: BridgeVerb;
|
||||
args?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface BridgeResponseOk {
|
||||
id: string;
|
||||
ok: true;
|
||||
result: unknown;
|
||||
}
|
||||
|
||||
export interface BridgeResponseErr {
|
||||
id: string;
|
||||
ok: false;
|
||||
error: string;
|
||||
}
|
||||
|
||||
export type BridgeResponse = BridgeResponseOk | BridgeResponseErr;
|
||||
|
||||
/** Serialise a request/response to a single line ("\n"-terminated). */
|
||||
export function frame(obj: BridgeRequest | BridgeResponse): string {
|
||||
return JSON.stringify(obj) + "\n";
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful line-buffered parser. Pass each chunk from the socket via
|
||||
* `feed`; collect completed lines from the returned array.
|
||||
*/
|
||||
export class LineParser {
|
||||
private buf = "";
|
||||
|
||||
feed(chunk: Buffer | string): string[] {
|
||||
this.buf += typeof chunk === "string" ? chunk : chunk.toString("utf-8");
|
||||
const lines: string[] = [];
|
||||
let nl = this.buf.indexOf("\n");
|
||||
while (nl !== -1) {
|
||||
lines.push(this.buf.slice(0, nl));
|
||||
this.buf = this.buf.slice(nl + 1);
|
||||
nl = this.buf.indexOf("\n");
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
}
|
||||
@@ -1,229 +0,0 @@
|
||||
/**
|
||||
* Bridge server — the MCP push-pipe runs one of these per connected mesh.
|
||||
*
|
||||
* Listens on a Unix domain socket at `~/.claudemesh/sockets/<mesh-slug>.sock`,
|
||||
* accepts line-delimited JSON requests from CLI invocations, dispatches each
|
||||
* request to the corresponding `BrokerClient` method, and writes the response
|
||||
* back on the same line.
|
||||
*
|
||||
* Lifecycle:
|
||||
* - `startBridgeServer(client)` is called from the MCP push-pipe boot path
|
||||
* once the WS is connected (or even before — verbs that need an open WS
|
||||
* will return an error).
|
||||
* - On startup it `unlinks` any stale socket file (left by a crashed
|
||||
* prior process), then `listen`s.
|
||||
* - On shutdown (`stop()`) it closes the listener and unlinks the socket.
|
||||
*
|
||||
* Concurrency: each accepted connection gets its own line-buffered parser.
|
||||
* Multiple in-flight requests are correlated by `id`; the server doesn't
|
||||
* need to serialize because the underlying `BrokerClient` calls are
|
||||
* `async` and non-blocking.
|
||||
*
|
||||
* Error model: malformed lines are dropped silently (don't tear down the
|
||||
* socket). Unknown verbs return `{ok: false, error: "unknown verb"}`.
|
||||
* Broker errors are wrapped into the `error` string.
|
||||
*/
|
||||
|
||||
import { createServer, type Server, type Socket } from "node:net";
|
||||
import { mkdirSync, unlinkSync, existsSync, chmodSync } from "node:fs";
|
||||
import { dirname } from "node:path";
|
||||
import type { BrokerClient } from "~/services/broker/facade.js";
|
||||
import {
|
||||
socketPath,
|
||||
socketDir,
|
||||
frame,
|
||||
LineParser,
|
||||
type BridgeRequest,
|
||||
type BridgeResponse,
|
||||
type BridgeVerb,
|
||||
} from "./protocol.js";
|
||||
|
||||
export interface BridgeServer {
|
||||
stop(): void;
|
||||
path: string;
|
||||
}
|
||||
|
||||
type PeerStatus = "idle" | "working" | "dnd";
|
||||
|
||||
/**
|
||||
* Resolve a `to` string to a broker-friendly target spec. Mirrors what
|
||||
* `commands/send.ts` does today — display name → pubkey, hex stays hex,
|
||||
* `@group` and `*` pass through.
|
||||
*/
|
||||
async function resolveTarget(
|
||||
client: BrokerClient,
|
||||
to: string,
|
||||
): Promise<{ ok: true; spec: string } | { ok: false; error: string }> {
|
||||
if (to.startsWith("@") || to === "*" || /^[0-9a-f]{64}$/i.test(to)) {
|
||||
return { ok: true, spec: to };
|
||||
}
|
||||
const peers = await client.listPeers();
|
||||
const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase());
|
||||
if (!match) {
|
||||
return {
|
||||
ok: false,
|
||||
error: `peer "${to}" not found. online: ${peers.map((p) => p.displayName).join(", ") || "(none)"}`,
|
||||
};
|
||||
}
|
||||
return { ok: true, spec: match.pubkey };
|
||||
}
|
||||
|
||||
async function dispatch(
|
||||
client: BrokerClient,
|
||||
req: BridgeRequest,
|
||||
): Promise<BridgeResponse> {
|
||||
const args = req.args ?? {};
|
||||
try {
|
||||
switch (req.verb as BridgeVerb) {
|
||||
case "ping": {
|
||||
const peers = await client.listPeers();
|
||||
return {
|
||||
id: req.id,
|
||||
ok: true,
|
||||
result: {
|
||||
mesh: client.meshSlug,
|
||||
ws_status: client.status,
|
||||
peers_online: peers.length,
|
||||
push_buffer: client.pushHistory.length,
|
||||
},
|
||||
};
|
||||
}
|
||||
case "peers": {
|
||||
const peers = await client.listPeers();
|
||||
return { id: req.id, ok: true, result: peers };
|
||||
}
|
||||
case "send": {
|
||||
const to = String(args.to ?? "");
|
||||
const message = String(args.message ?? "");
|
||||
const priority = (args.priority as "now" | "next" | "low" | undefined) ?? "next";
|
||||
if (!to || !message) {
|
||||
return { id: req.id, ok: false, error: "send: `to` and `message` required" };
|
||||
}
|
||||
const resolved = await resolveTarget(client, to);
|
||||
if (!resolved.ok) return { id: req.id, ok: false, error: resolved.error };
|
||||
const result = await client.send(resolved.spec, message, priority);
|
||||
if (!result.ok) {
|
||||
return { id: req.id, ok: false, error: result.error ?? "send failed" };
|
||||
}
|
||||
return {
|
||||
id: req.id,
|
||||
ok: true,
|
||||
result: { messageId: result.messageId, target: resolved.spec },
|
||||
};
|
||||
}
|
||||
case "summary": {
|
||||
const text = String(args.summary ?? "");
|
||||
if (!text) return { id: req.id, ok: false, error: "summary: `summary` required" };
|
||||
await client.setSummary(text);
|
||||
return { id: req.id, ok: true, result: { summary: text } };
|
||||
}
|
||||
case "status_set": {
|
||||
const state = String(args.status ?? "") as PeerStatus;
|
||||
if (!["idle", "working", "dnd"].includes(state)) {
|
||||
return { id: req.id, ok: false, error: "status_set: must be idle | working | dnd" };
|
||||
}
|
||||
await client.setStatus(state);
|
||||
return { id: req.id, ok: true, result: { status: state } };
|
||||
}
|
||||
case "visible": {
|
||||
const visible = Boolean(args.visible);
|
||||
await client.setVisible(visible);
|
||||
return { id: req.id, ok: true, result: { visible } };
|
||||
}
|
||||
default:
|
||||
return { id: req.id, ok: false, error: `unknown verb: ${req.verb}` };
|
||||
}
|
||||
} catch (err) {
|
||||
return {
|
||||
id: req.id,
|
||||
ok: false,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
function handleConnection(socket: Socket, client: BrokerClient): void {
|
||||
const parser = new LineParser();
|
||||
|
||||
socket.on("data", (chunk) => {
|
||||
const lines = parser.feed(chunk);
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) continue;
|
||||
let req: BridgeRequest;
|
||||
try {
|
||||
req = JSON.parse(line) as BridgeRequest;
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!req || typeof req !== "object" || !req.id || !req.verb) continue;
|
||||
|
||||
// Fire-and-await without blocking the read loop.
|
||||
void dispatch(client, req).then((res) => {
|
||||
try {
|
||||
socket.write(frame(res));
|
||||
} catch {
|
||||
/* socket might have closed mid-flight; ignore */
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
socket.on("error", () => {
|
||||
// Don't crash the push-pipe on per-connection errors.
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the per-mesh bridge server. Returns a handle the caller stores so
|
||||
* it can `stop()` on shutdown.
|
||||
*
|
||||
* Idempotent: if a socket file already exists, attempts to connect to it.
|
||||
* If that connection succeeds, another live process owns it — return null.
|
||||
* If it fails (ECONNREFUSED), the file is stale; unlink it and proceed.
|
||||
*/
|
||||
export function startBridgeServer(client: BrokerClient): BridgeServer | null {
|
||||
const path = socketPath(client.meshSlug);
|
||||
const dir = socketDir();
|
||||
|
||||
if (!existsSync(dir)) {
|
||||
mkdirSync(dir, { recursive: true, mode: 0o700 });
|
||||
}
|
||||
|
||||
// Last-writer-wins: unconditionally remove any existing socket file and
|
||||
// bind fresh. A live process previously holding it keeps its already-
|
||||
// accepted connections (sockets aren't path-based after connect), but
|
||||
// new CLI dials hit the new server. In practice this only matters when
|
||||
// two `claudemesh launch` invocations target the same mesh — rare, and
|
||||
// either instance serving CLI requests is fine because both speak to
|
||||
// the same broker.
|
||||
if (existsSync(path)) {
|
||||
try { unlinkSync(path); } catch {}
|
||||
}
|
||||
|
||||
const server: Server = createServer((socket) => handleConnection(socket, client));
|
||||
|
||||
try {
|
||||
server.listen(path);
|
||||
} catch (err) {
|
||||
process.stderr.write(`[claudemesh] bridge: failed to bind ${path}: ${String(err)}\n`);
|
||||
return null;
|
||||
}
|
||||
|
||||
server.on("error", (err) => {
|
||||
process.stderr.write(`[claudemesh] bridge: ${String(err)}\n`);
|
||||
});
|
||||
|
||||
// Tighten permissions so other users on the host can't dial in.
|
||||
try { chmodSync(path, 0o600); } catch {}
|
||||
|
||||
let stopped = false;
|
||||
return {
|
||||
path,
|
||||
stop(): void {
|
||||
if (stopped) return;
|
||||
stopped = true;
|
||||
try { server.close(); } catch {}
|
||||
try { unlinkSync(path); } catch {}
|
||||
},
|
||||
};
|
||||
}
|
||||
42
apps/cli/src/services/daemon/policy.ts
Normal file
42
apps/cli/src/services/daemon/policy.ts
Normal file
@@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Per-process daemon policy — set once at CLI entry from --no-daemon /
|
||||
* --strict / env var, then read by daemon-routing helpers.
|
||||
*
|
||||
* Modes:
|
||||
* "auto" (default) probe → auto-spawn → retry → cold fallback
|
||||
* "strict" probe → auto-spawn → retry → ERROR (no cold fallback)
|
||||
* "no-daemon" skip daemon entirely → straight to cold path
|
||||
*
|
||||
* Env equivalents (for headless/CI use):
|
||||
* CLAUDEMESH_STRICT_DAEMON=1 → strict
|
||||
* CLAUDEMESH_NO_DAEMON=1 → no-daemon
|
||||
*
|
||||
* Flag wins over env when both are set.
|
||||
*/
|
||||
|
||||
export type DaemonMode = "auto" | "strict" | "no-daemon";
|
||||
|
||||
export interface DaemonPolicy { mode: DaemonMode; }
|
||||
|
||||
let policy: DaemonPolicy = readEnvDefault();
|
||||
|
||||
function readEnvDefault(): DaemonPolicy {
|
||||
if (process.env.CLAUDEMESH_NO_DAEMON === "1") return { mode: "no-daemon" };
|
||||
if (process.env.CLAUDEMESH_STRICT_DAEMON === "1") return { mode: "strict" };
|
||||
return { mode: "auto" };
|
||||
}
|
||||
|
||||
export function setDaemonPolicy(mode: DaemonMode): void {
|
||||
policy = { mode };
|
||||
}
|
||||
|
||||
export function getDaemonPolicy(): DaemonPolicy {
|
||||
return policy;
|
||||
}
|
||||
|
||||
/** Pick a mode from parsed flags. CLI flags win over env. */
|
||||
export function policyFromFlags(flags: Record<string, unknown>): DaemonMode {
|
||||
if (flags["no-daemon"]) return "no-daemon";
|
||||
if (flags.strict) return "strict";
|
||||
return readEnvDefault().mode;
|
||||
}
|
||||
@@ -7,6 +7,7 @@
|
||||
*/
|
||||
|
||||
import type { EnsureDaemonResult } from "~/services/daemon/lifecycle.js";
|
||||
import { getDaemonPolicy } from "~/services/daemon/policy.js";
|
||||
import { dim } from "./styles.js";
|
||||
|
||||
let alreadyWarned = false;
|
||||
@@ -27,6 +28,11 @@ export function warnDaemonState(
|
||||
if (opts.quiet || opts.json) return false;
|
||||
if (res.state === "up") return false;
|
||||
|
||||
// Under --strict, the cold-path gate at `withMesh` will print its own
|
||||
// refusal message — suppress the misleading "using cold path" hint
|
||||
// here so the user sees a single, accurate error.
|
||||
if (getDaemonPolicy().mode === "strict" && res.state !== "started") return false;
|
||||
|
||||
alreadyWarned = true;
|
||||
const tag = (label: string) => `[claudemesh] ${label}`;
|
||||
const hint = (s: string) => dim(s);
|
||||
|
||||
Reference in New Issue
Block a user