feat(daemon): multi-mesh — attach to all joined meshes simultaneously
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

The 1.26.0 step that finally delivers ambient mode for multi-mesh
users. Daemon holds Map<slug, DaemonBrokerClient>; one process, one
PID per user, all your meshes online concurrently.

run.ts: claudemesh daemon up with no --mesh attaches to every joined
mesh from config. --mesh <slug> still scopes to one (legacy mode).
The daemon_started log line reports meshes: [...] instead of mesh.

drain.ts: dispatches each outbox row to the broker keyed by row.mesh
(column added in 1.25.0). Legacy rows with mesh=NULL fall back to the
only broker if there's exactly one, otherwise mark dead with a clear
error.

ipc/server.ts:
- GET /v1/peers aggregates across all attached meshes; each peer
  record gains a mesh field. ?mesh=<slug> narrows server-side.
- GET /v1/skills aggregates similarly; /v1/skills/:name walks meshes
  and returns first match.
- POST /v1/send requires mesh field on multi-mesh daemons; auto-picks
  on single-mesh; returns 400 with attached list if ambiguous.
- POST /v1/profile accepts optional mesh; without it, fans out to all
  attached meshes (consistent presence).

CLI: trySendViaDaemon now forwards expectedMesh as the body's mesh
field (was informational, now authoritative). claudemesh send
--mesh A and --mesh B from the same shell both route to the right
broker via the same daemon process.

Verified: aggregated peer list across 3 attached meshes; cross-mesh
sends from CLI reach status=done with correct broker_message_ids.

Released as 1.26.0 on npm.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 02:14:43 +01:00
parent 0e3a5babd9
commit cb90f1ca60
6 changed files with 233 additions and 88 deletions

View File

@@ -93,62 +93,65 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
const bus = new EventBus();
// Pick the mesh. If the user joined exactly one, use it; otherwise
// require --mesh. Daemon CAN start with no mesh — the outbox will
// accept rows but `dead` them after retries because the broker is
// never reachable. Better to fail fast.
// 1.26.0 — multi-mesh by default. With --mesh <slug>, the daemon
// scopes to one mesh (legacy mode). Without it, attaches to every
// joined mesh simultaneously so ambient mode (raw `claude`) works
// for all meshes with one daemon process.
const cfg = readConfig();
let mesh = null as null | typeof cfg.meshes[number];
let meshes: Array<typeof cfg.meshes[number]>;
if (opts.mesh) {
mesh = cfg.meshes.find((m) => m.slug === opts.mesh) ?? null;
if (!mesh) {
const found = cfg.meshes.find((m) => m.slug === opts.mesh);
if (!found) {
process.stderr.write(`mesh not found: ${opts.mesh}\n`);
process.stderr.write(`joined meshes: ${cfg.meshes.map((m) => m.slug).join(", ") || "(none)"}\n`);
releaseSingletonLock();
try { outboxDb.close(); } catch { /* ignore */ }
return 2;
}
} else if (cfg.meshes.length === 1) {
mesh = cfg.meshes[0]!;
meshes = [found];
} else if (cfg.meshes.length === 0) {
process.stderr.write(`no mesh joined; run \`claudemesh join <invite-url>\` first\n`);
releaseSingletonLock();
try { outboxDb.close(); } catch { /* ignore */ }
return 2;
} else {
process.stderr.write(`multiple meshes joined; pass --mesh <slug>\n`);
process.stderr.write(`available: ${cfg.meshes.map((m) => m.slug).join(", ")}\n`);
releaseSingletonLock();
try { outboxDb.close(); } catch { /* ignore */ }
return 2;
meshes = cfg.meshes;
}
// Connect to broker (non-fatal: connection failures get retried;
// outbox keeps queuing during outages).
const broker = new DaemonBrokerClient(mesh, {
displayName: opts.displayName,
onStatusChange: (s) => {
process.stdout.write(JSON.stringify({
msg: "broker_status", status: s, mesh: mesh!.slug, ts: new Date().toISOString(),
}) + "\n");
bus.publish("broker_status", { mesh: mesh!.slug, status: s });
},
onPush: (m) => {
const sessionKeys = broker.getSessionKeys();
void handleBrokerPush(m, {
db: inboxDb,
bus,
meshSlug: mesh!.slug,
recipientSecretKeyHex: mesh!.secretKey,
sessionSecretKeyHex: sessionKeys?.sessionSecretKey,
});
},
});
broker.connect().catch((err) => process.stderr.write(`broker connect failed: ${String(err)}\n`));
// Spin up one broker per mesh. Connection failures are non-fatal:
// the outbox keeps queuing per-mesh and reconnect logic in
// DaemonBrokerClient handles reattach.
const brokers = new Map<string, DaemonBrokerClient>();
const meshConfigs = new Map<string, typeof cfg.meshes[number]>();
for (const mesh of meshes) {
meshConfigs.set(mesh.slug, mesh);
const broker = new DaemonBrokerClient(mesh, {
displayName: opts.displayName,
onStatusChange: (s) => {
process.stdout.write(JSON.stringify({
msg: "broker_status", status: s, mesh: mesh.slug, ts: new Date().toISOString(),
}) + "\n");
bus.publish("broker_status", { mesh: mesh.slug, status: s });
},
onPush: (m) => {
const sessionKeys = broker.getSessionKeys();
void handleBrokerPush(m, {
db: inboxDb,
bus,
meshSlug: mesh.slug,
recipientSecretKeyHex: mesh.secretKey,
sessionSecretKeyHex: sessionKeys?.sessionSecretKey,
});
},
});
broker.connect().catch((err) => process.stderr.write(`broker connect failed for ${mesh.slug}: ${String(err)}\n`));
brokers.set(mesh.slug, broker);
}
// Start the drain worker.
// Start the drain worker. With multi-mesh, drain dispatches each
// outbox row to its mesh's broker via the `mesh` column.
let drain: DrainHandle | null = null;
drain = startDrainWorker({ db: outboxDb, broker });
drain = startDrainWorker({ db: outboxDb, brokers });
const ipc = startIpcServer({
localToken,
@@ -157,12 +160,9 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
outboxDb,
inboxDb,
bus,
broker,
brokers,
meshConfigs,
onPendingInserted: () => drain?.wake(),
// Sprint 4: IPC accept-send needs these to resolve targets and
// encrypt at accept time so the drain worker is just a forwarder.
meshSecretKey: mesh.secretKey,
meshSlug: mesh.slug,
});
try {
@@ -178,7 +178,7 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
pid: process.pid,
sock: DAEMON_PATHS.SOCK_FILE,
tcp: tcpEnabled ? `127.0.0.1:47823` : null,
mesh: mesh.slug,
meshes: meshes.map((m) => m.slug),
ts: new Date().toISOString(),
}) + "\n");
@@ -188,7 +188,9 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
shuttingDown = true;
process.stdout.write(JSON.stringify({ msg: "daemon_shutdown", signal: sig, ts: new Date().toISOString() }) + "\n");
if (drain) await drain.close();
await broker.close();
for (const b of brokers.values()) {
try { await b.close(); } catch { /* ignore */ }
}
await ipc.close();
try { outboxDb.close(); } catch { /* ignore */ }
try { inboxDb.close(); } catch { /* ignore */ }