From 9dd1e401b0f064436a7e6e495712b7f5c44c1e2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 2 May 2026 13:41:50 +0100 Subject: [PATCH] =?UTF-8?q?feat(sdk+cli):=20bridge=20peer=20=E2=80=94=20fo?= =?UTF-8?q?rward=20a=20topic=20between=20two=20meshes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A bridge holds memberships in two meshes and relays messages on a single topic between them. Federation-lite without a broker-to-broker protocol. SDK additions: - Bridge class (start, stop, EventEmitter for forwarded/dropped/error) - MeshClient.joinTopic / leaveTopic / createTopic methods - Loop prevention: plaintext hop counter prefix __cmh: with maxHops default 2; echo guard via senderPubkey == own session pubkey CLI additions: - claudemesh bridge run long-lived process - claudemesh bridge init prints config template - Zero-dep YAML parser for the flat bridge config shape The hop prefix is visible in message bodies — minor wart, fixed in v0.3.0 by moving loop tracking into broker primitives. SDK kept as devDependency since Bun bundles it into dist; no impact on npm publish or runtime resolution. Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/cli/package.json | 1 + apps/cli/src/cli/policy-classify.ts | 7 + apps/cli/src/commands/bridge.ts | 213 ++++++++++++++++++++++++++++ apps/cli/src/entrypoints/cli.ts | 21 +++ packages/sdk/src/bridge.ts | 162 +++++++++++++++++++++ packages/sdk/src/client.ts | 26 ++++ packages/sdk/src/index.ts | 2 + 7 files changed, 432 insertions(+) create mode 100644 apps/cli/src/commands/bridge.ts create mode 100644 packages/sdk/src/bridge.ts diff --git a/apps/cli/package.json b/apps/cli/package.json index 91e0bbd..0d874d0 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -55,6 +55,7 @@ "zod": "4.1.13" }, "devDependencies": { + "@claudemesh/sdk": "workspace:*", "@turbostarter/eslint-config": "workspace:*", "@turbostarter/prettier-config": "workspace:*", "@turbostarter/tsconfig": "workspace:*", diff --git a/apps/cli/src/cli/policy-classify.ts b/apps/cli/src/cli/policy-classify.ts index 90e6586..0d2da0d 100644 --- a/apps/cli/src/cli/policy-classify.ts +++ b/apps/cli/src/cli/policy-classify.ts @@ -99,6 +99,13 @@ export function classifyInvocation(command: string, positionals: string[]): Invo const writeVerbs = new Set(["create", "revoke"]); return { resource: "apikey", verb, isWrite: writeVerbs.has(verb) }; } + case "bridge": { + // bridge verbs: run (long-lived forwarder) | init (print template). + // `run` is a write at the mesh level since it joins both meshes + // and posts messages on their topics. + const verb = sub || "init"; + return { resource: "bridge", verb, isWrite: verb === "run" }; + } // Platform — sub is the verb. case "vector": case "graph": case "context": case "stream": diff --git a/apps/cli/src/commands/bridge.ts b/apps/cli/src/commands/bridge.ts new file mode 100644 index 0000000..09fa31f --- /dev/null +++ b/apps/cli/src/commands/bridge.ts @@ -0,0 +1,213 @@ +/** + * `claudemesh bridge run ` — long-lived process that joins + * two meshes and forwards a single topic between them. + * + * The CLI doesn't link against @claudemesh/sdk to avoid a workspace + * coupling at publish time — instead it constructs the SDK Bridge + * inline using the same MeshClient that the rest of the CLI already + * relies on. The bridge config file specifies broker URLs, mesh ids, + * memberships (private keys), and the topic name on each side. + * + * Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + */ + +import { readFileSync, existsSync } from "node:fs"; +import { render } from "~/ui/render.js"; +import { bold, clay, dim, green, red, yellow } from "~/ui/styles.js"; +import { EXIT } from "~/constants/exit-codes.js"; + +interface BridgeConfigSide { + broker_url: string; + mesh_id: string; + member_id: string; + /** Hex-encoded ed25519 public key. */ + pubkey: string; + /** Hex-encoded ed25519 secret key (64 bytes). */ + secret_key: string; + topic: string; + display_name?: string; + role?: "lead" | "member" | "observer"; +} + +interface BridgeConfig { + a: BridgeConfigSide; + b: BridgeConfigSide; + max_hops?: number; +} + +/** Tiny YAML parser — handles the flat shape `bridge run` accepts. For + * complex configs, callers can pass JSON (.json extension). */ +function parseConfig(text: string): BridgeConfig { + const trimmed = text.trim(); + if (trimmed.startsWith("{")) return JSON.parse(trimmed) as BridgeConfig; + + const root: Record | number> = {}; + let cursor: Record | null = null; + for (const raw of text.split("\n")) { + const line = raw.replace(/#.*$/, "").trimEnd(); + if (!line.trim()) continue; + + const top = line.match(/^(a|b)\s*:\s*$/); + if (top) { + cursor = {}; + root[top[1]!] = cursor; + continue; + } + const flat = line.match(/^(\w+)\s*:\s*(.+)$/); + if (flat && /^\s/.test(line) && cursor) { + cursor[flat[1]!] = parseScalar(flat[2]!); + } else if (flat) { + const v = parseScalar(flat[2]!); + // top-level scalars (e.g. max_hops) — only number/string supported + if (typeof v === "number") root[flat[1]!] = v; + } + } + return root as unknown as BridgeConfig; +} + +function parseScalar(raw: string): string | number | boolean { + const v = raw.trim().replace(/^["'](.*)["']$/, "$1"); + if (v === "true") return true; + if (v === "false") return false; + if (/^-?\d+(\.\d+)?$/.test(v)) return Number(v); + return v; +} + +export async function runBridge(configPath: string): Promise { + if (!configPath) { + render.err("Usage: claudemesh bridge run "); + return EXIT.INVALID_ARGS; + } + if (!existsSync(configPath)) { + render.err(`config file not found: ${configPath}`); + return EXIT.NOT_FOUND; + } + + let cfg: BridgeConfig; + try { + cfg = parseConfig(readFileSync(configPath, "utf-8")); + } catch (e) { + render.err(`failed to parse ${configPath}: ${e instanceof Error ? e.message : String(e)}`); + return EXIT.INVALID_ARGS; + } + if (!cfg.a || !cfg.b) { + render.err("config must define 'a:' and 'b:' sections"); + return EXIT.INVALID_ARGS; + } + for (const [name, side] of [["a", cfg.a], ["b", cfg.b]] as const) { + if (!side.broker_url || !side.mesh_id || !side.member_id || !side.pubkey || !side.secret_key || !side.topic) { + render.err(`config side '${name}' missing required fields: broker_url, mesh_id, member_id, pubkey, secret_key, topic`); + return EXIT.INVALID_ARGS; + } + } + + // Lazy-load SDK so the CLI bundle stays trim for users who never + // bridge. + const { Bridge } = await import("@claudemesh/sdk"); + + const bridge = new Bridge({ + a: { + client: { + brokerUrl: cfg.a.broker_url, + meshId: cfg.a.mesh_id, + memberId: cfg.a.member_id, + pubkey: cfg.a.pubkey, + secretKey: cfg.a.secret_key, + displayName: cfg.a.display_name ?? "bridge", + peerType: "connector", + channel: "bridge", + }, + topic: cfg.a.topic, + role: cfg.a.role, + }, + b: { + client: { + brokerUrl: cfg.b.broker_url, + meshId: cfg.b.mesh_id, + memberId: cfg.b.member_id, + pubkey: cfg.b.pubkey, + secretKey: cfg.b.secret_key, + displayName: cfg.b.display_name ?? "bridge", + peerType: "connector", + channel: "bridge", + }, + topic: cfg.b.topic, + role: cfg.b.role, + }, + maxHops: cfg.max_hops, + }); + + bridge.on("forwarded", (e) => { + process.stdout.write( + `${dim(new Date().toISOString())} ${green("→")} ${e.from}→${e.to} hop=${e.hop} ${dim(`${e.bytes}b`)}\n`, + ); + }); + bridge.on("dropped", (e) => { + process.stdout.write( + `${dim(new Date().toISOString())} ${yellow("·")} drop from=${e.from} reason=${e.reason}${e.hop >= 0 ? ` hop=${e.hop}` : ""}\n`, + ); + }); + bridge.on("error", (e) => { + process.stderr.write(`${red("✘")} ${e.message}\n`); + }); + + try { + await bridge.start(); + } catch (e) { + render.err(`bridge failed to start: ${e instanceof Error ? e.message : String(e)}`); + return EXIT.NETWORK_ERROR; + } + + render.ok( + "bridge running", + `${clay("#" + cfg.a.topic)} ${dim("⟷")} ${clay("#" + cfg.b.topic)}`, + ); + process.stderr.write(`${dim(` meshes: ${cfg.a.mesh_id.slice(0, 8)} ⟷ ${cfg.b.mesh_id.slice(0, 8)} max_hops: ${cfg.max_hops ?? 2}`)}\n`); + process.stderr.write(`${dim(" Ctrl-C to stop.")}\n\n`); + + // Keep the process alive; bridge runs forever. + await new Promise((resolve) => { + const stop = async (): Promise => { + process.stderr.write(`\n${dim("stopping bridge...")}\n`); + await bridge.stop(); + resolve(); + }; + process.on("SIGINT", stop); + process.on("SIGTERM", stop); + }); + + return EXIT.SUCCESS; +} + +/** Generate a config skeleton for the user to fill in. */ +export function bridgeConfigTemplate(): string { + return `# claudemesh bridge config +# Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md +# +# A bridge holds memberships in two meshes and forwards messages on a +# single topic between them. Loop prevention via plaintext hop counter +# (visible in message body — minor wart, fixed in v0.3.0). +# +# Tip: \`claudemesh peer verify\` shows the keys/ids you need below. + +max_hops: 2 + +a: + broker_url: wss://ic.claudemesh.com/ws + mesh_id: + member_id: + pubkey: + secret_key: + topic: incidents + display_name: bridge + role: member + +b: + broker_url: wss://ic.claudemesh.com/ws + mesh_id: + member_id: + pubkey: + secret_key: + topic: incidents +`; +} diff --git a/apps/cli/src/entrypoints/cli.ts b/apps/cli/src/entrypoints/cli.ts index 881dc37..baae457 100644 --- a/apps/cli/src/entrypoints/cli.ts +++ b/apps/cli/src/entrypoints/cli.ts @@ -107,6 +107,10 @@ API keys (REST + external WS auth, v0.2.0) claudemesh apikey list show keys (status, last-used, scope) claudemesh apikey revoke revoke a key +Bridge (forward a topic between two meshes, v0.2.0) + claudemesh bridge init print config template + claudemesh bridge run run bridge as a long-lived process + Topic (conversation scope, v0.2.0) claudemesh topic create create a topic [--description --visibility] claudemesh topic list list topics in the mesh @@ -514,6 +518,23 @@ async function main(): Promise { break; } + // bridge — forward a topic between two meshes (v0.2.0) + case "bridge": { + const sub = positionals[0]; + if (sub === "run") { + const { runBridge } = await import("~/commands/bridge.js"); + process.exit(await runBridge(positionals[1] ?? "")); + } else if (sub === "init" || sub === "config") { + const { bridgeConfigTemplate } = await import("~/commands/bridge.js"); + console.log(bridgeConfigTemplate()); + process.exit(EXIT.SUCCESS); + } else { + console.error("Usage: claudemesh bridge | init>"); + process.exit(EXIT.INVALID_ARGS); + } + break; + } + // apikey — REST + external WS bearer tokens (v0.2.0) case "apikey": case "api-key": { const sub = positionals[0]; diff --git a/packages/sdk/src/bridge.ts b/packages/sdk/src/bridge.ts new file mode 100644 index 0000000..411891c --- /dev/null +++ b/packages/sdk/src/bridge.ts @@ -0,0 +1,162 @@ +/** + * Bridge — forward a single topic between two meshes. + * + * A bridge is a peer that holds memberships in two meshes simultaneously + * and relays messages on a single topic from each side to the other. + * Federation-lite: get the value of cross-mesh communication without + * designing a broker-to-broker protocol. + * + * Loop prevention via plaintext hop counter: every forwarded message is + * prefixed with `__cmh:` where is the hop count. The bridge + * increments on forward; if it sees a message at or beyond `maxHops`, it + * drops. The bridge also drops messages whose sender pubkey matches its + * own membership on either side (echo protection). + * + * The hop prefix is visible to readers — a wart, but acceptable for + * v0.2.0. A v0.3.0 follow-up will move loop tracking into broker + * primitives (message tags / metadata fields). + * + * Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + */ + +import { EventEmitter } from "node:events"; +import { MeshClient } from "./client.js"; +import type { MeshClientOptions, InboundMessage } from "./types.js"; + +export interface BridgeSide { + /** MeshClient options for this side (broker URL, mesh keys, identity). */ + client: MeshClientOptions; + /** Topic name to forward (without `#` prefix). */ + topic: string; + /** Optional role applied when joining the topic. */ + role?: "lead" | "member" | "observer"; +} + +export interface BridgeOptions { + a: BridgeSide; + b: BridgeSide; + /** Maximum total hops a message can take. Default 2 (one forward each way). */ + maxHops?: number; + /** Optional filter — return false to skip forwarding a specific message. */ + filter?: ( + msg: InboundMessage, + fromSide: "a" | "b", + ) => boolean | Promise; +} + +export interface BridgeEvents { + forwarded: [{ from: "a" | "b"; to: "a" | "b"; hop: number; bytes: number }]; + dropped: [{ from: "a" | "b"; reason: string; hop: number }]; + error: [Error]; +} + +const HOP_PREFIX_RE = /^__cmh(\d+):/; +const MAX_HOPS_DEFAULT = 2; + +export class Bridge extends EventEmitter { + private clientA: MeshClient; + private clientB: MeshClient; + private maxHops: number; + private opts: BridgeOptions; + private started = false; + + constructor(opts: BridgeOptions) { + super(); + this.opts = opts; + this.maxHops = opts.maxHops ?? MAX_HOPS_DEFAULT; + this.clientA = new MeshClient(opts.a.client); + this.clientB = new MeshClient(opts.b.client); + } + + /** + * Connect both clients, subscribe to topics on both sides, wire the + * forwarding handlers. Resolves once both meshes are open and joined. + * Throws if either side fails to connect. + */ + async start(): Promise { + if (this.started) return; + this.started = true; + + await Promise.all([this.clientA.connect(), this.clientB.connect()]); + await Promise.all([ + this.clientA.joinTopic(this.opts.a.topic, this.opts.a.role), + this.clientB.joinTopic(this.opts.b.topic, this.opts.b.role), + ]); + + this.clientA.on("message", (m: InboundMessage) => + this.handleIncoming("a", m).catch((e: unknown) => + this.emit("error", e instanceof Error ? e : new Error(String(e))), + ), + ); + this.clientB.on("message", (m: InboundMessage) => + this.handleIncoming("b", m).catch((e: unknown) => + this.emit("error", e instanceof Error ? e : new Error(String(e))), + ), + ); + } + + /** Disconnect both clients. */ + async stop(): Promise { + if (!this.started) return; + this.started = false; + this.clientA.disconnect(); + this.clientB.disconnect(); + } + + private async handleIncoming( + fromSide: "a" | "b", + msg: InboundMessage, + ): Promise { + // Only forward messages we can read plaintext for. System events, + // DMs targeted to other peers, and crypto_box-encrypted messages we + // can't decrypt are skipped — every bridged message has to round-trip + // through `send(plaintext)` on the other side, so we need text. + if (msg.subtype === "system") return; + const text = msg.plaintext; + if (!text) return; + + // Echo guard — if the sender pubkey matches either of our own + // memberships, this message was just forwarded by us. Drop before + // it bounces. + const ownA = this.clientA.pubkey; + const ownB = this.clientB.pubkey; + if (msg.senderPubkey === ownA || msg.senderPubkey === ownB) { + this.emit("dropped", { from: fromSide, reason: "echo", hop: -1 }); + return; + } + + // User filter + if (this.opts.filter) { + const ok = await this.opts.filter(msg, fromSide); + if (!ok) { + this.emit("dropped", { from: fromSide, reason: "filter", hop: -1 }); + return; + } + } + + // Parse hop counter from plaintext prefix. + const m = text.match(HOP_PREFIX_RE); + const currentHop = m ? Number(m[1]) : 0; + const nextHop = currentHop + 1; + + if (nextHop > this.maxHops) { + this.emit("dropped", { from: fromSide, reason: "max_hops", hop: currentHop }); + return; + } + + // Strip existing prefix, prepend new one. + const stripped = m ? text.slice(m[0].length) : text; + const forwarded = `__cmh${nextHop}:${stripped}`; + + const targetClient = fromSide === "a" ? this.clientB : this.clientA; + const targetTopic = fromSide === "a" ? this.opts.b.topic : this.opts.a.topic; + await targetClient.send(`#${targetTopic}`, forwarded, "next"); + + this.emit("forwarded", { + from: fromSide, + to: fromSide === "a" ? "b" : "a", + hop: nextHop, + bytes: forwarded.length, + }); + } +} diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index dc978dc..861b873 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -365,6 +365,32 @@ export class MeshClient extends EventEmitter { this.ws.send(JSON.stringify({ type: "set_status", status })); } + // --- Topics (v0.2.0) --- + // Conversational primitive within a mesh. To receive topic-tagged + // messages, you must subscribe via `joinTopic`. + + async createTopic(args: { + name: string; + description?: string; + visibility?: "public" | "private" | "dm"; + }): Promise { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_create", ...args })); + } + + async joinTopic( + topic: string, + role?: "lead" | "member" | "observer", + ): Promise { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_join", topic, role })); + } + + async leaveTopic(topic: string): Promise { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_leave", topic })); + } + // --- Internals --- private makeReqId(): string { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 8e08c7b..2b8af78 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -1,5 +1,6 @@ export { MeshClient } from "./client.js"; export { generateKeyPair } from "./crypto.js"; +export { Bridge } from "./bridge.js"; export type { PeerInfo, InboundMessage, @@ -7,3 +8,4 @@ export type { ConnStatus, MeshClientOptions, } from "./types.js"; +export type { BridgeOptions, BridgeSide, BridgeEvents } from "./bridge.js";