feat(cli): sessionbrokerclient + registry hooks (flag-gated)

daemon-side half of 1.30.0 per-session broker presence. behind
CLAUDEMESH_SESSION_PRESENCE=1 (default OFF this cycle so the broker
side bakes before the flag flips).

- SessionBrokerClient (apps/cli/src/daemon/session-broker.ts) — slim
  WS that opens with session_hello, presence-only, no outbox drain.
- session-hello-sig.ts — signParentAttestation (12h TTL, ≤24h cap) and
  signSessionHello, mirroring the broker canonical formats.
- session-registry: optional presence field on SessionInfo;
  setRegistryHooks for onRegister/onDeregister callbacks. Hook errors
  are caught so they can never throttle registry mutations.
- IPC POST /v1/sessions/register accepts the presence material under
  body.presence (session_pubkey, session_secret_key, parent_attestation).
  Older callers without it stay scoped + supported.
- run.ts wires the registry hooks: on register, opens a SessionBrokerClient
  for the matching mesh; on deregister (explicit or reaper), closes it.
  Shutdown closes any remaining session WSes before the IPC server.

8 new unit tests cover registry lifecycle (replace/throw/presence
roundtrip) and signature canonical-bytes verification against libsodium.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 13:05:33 +01:00
parent e688f66791
commit d62b3f45d2
7 changed files with 662 additions and 4 deletions

View File

@@ -4,11 +4,12 @@ import { DAEMON_PATHS } from "./paths.js";
import { acquireSingletonLock, releaseSingletonLock } from "./lock.js";
import { ensureLocalToken } from "./local-token.js";
import { startIpcServer } from "./ipc/server.js";
import { startReaper } from "./session-registry.js";
import { setRegistryHooks, startReaper, type SessionInfo } from "./session-registry.js";
import { openSqlite, type SqliteDb } from "./db/sqlite.js";
import { migrateOutbox } from "./db/outbox.js";
import { migrateInbox } from "./db/inbox.js";
import { DaemonBrokerClient } from "./broker.js";
import { SessionBrokerClient } from "./session-broker.js";
import { startDrainWorker, type DrainHandle } from "./drain.js";
import { handleBrokerPush } from "./inbound.js";
import { EventBus } from "./events.js";
@@ -27,6 +28,18 @@ export interface RunDaemonOptions {
clonePolicy?: ClonePolicy;
}
/**
* 1.30.0 feature flag. Default OFF for one release cycle so the broker
* side has time to deploy + bake before the daemon starts opening
* per-session WebSockets. Set CLAUDEMESH_SESSION_PRESENCE=0 to disable
* once the flag flips default-on.
*/
function isSessionPresenceEnabled(): boolean {
const v = process.env.CLAUDEMESH_SESSION_PRESENCE;
if (v === undefined || v === "") return false;
return v !== "0" && v.toLowerCase() !== "false" && v.toLowerCase() !== "off";
}
/** Detect a few common container environments to pick UDS-only by default. */
function detectContainer(): boolean {
if (process.env.KUBERNETES_SERVICE_HOST) return true;
@@ -154,6 +167,56 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
let drain: DrainHandle | null = null;
drain = startDrainWorker({ db: outboxDb, brokers });
// 1.30.0 — per-session broker presence. Default OFF for one release
// cycle so the broker side bakes before the flag flips. Opt-in via
// CLAUDEMESH_SESSION_PRESENCE=1; flips to default-on in 1.30.0 GA.
const sessionPresenceEnabled = isSessionPresenceEnabled();
const sessionBrokers = new Map<string, SessionBrokerClient>();
setRegistryHooks({
onRegister: (info) => {
if (!sessionPresenceEnabled) return;
if (!info.presence) return;
const meshConfig = meshConfigs.get(info.mesh);
if (!meshConfig) {
process.stderr.write(JSON.stringify({
level: "warn", msg: "session_broker_no_mesh_config", mesh: info.mesh,
ts: new Date().toISOString(),
}) + "\n");
return;
}
// Drop any pre-existing session WS under this token (re-register).
const prior = sessionBrokers.get(info.token);
if (prior) {
sessionBrokers.delete(info.token);
prior.close().catch(() => { /* ignore */ });
}
const client = new SessionBrokerClient({
mesh: meshConfig,
sessionPubkey: info.presence.sessionPubkey,
sessionSecretKey: info.presence.sessionSecretKey,
parentAttestation: info.presence.parentAttestation,
sessionId: info.sessionId,
displayName: info.displayName,
...(info.role ? { role: info.role } : {}),
...(info.cwd ? { cwd: info.cwd } : {}),
pid: info.pid,
});
sessionBrokers.set(info.token, client);
client.connect().catch((err) =>
process.stderr.write(JSON.stringify({
level: "warn", msg: "session_broker_connect_failed",
mesh: info.mesh, err: String(err), ts: new Date().toISOString(),
}) + "\n"),
);
},
onDeregister: (info: SessionInfo) => {
const client = sessionBrokers.get(info.token);
if (!client) return;
sessionBrokers.delete(info.token);
client.close().catch(() => { /* ignore */ });
},
});
startReaper();
const ipc = startIpcServer({
@@ -194,6 +257,10 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise<number> {
for (const b of brokers.values()) {
try { await b.close(); } catch { /* ignore */ }
}
for (const b of sessionBrokers.values()) {
try { await b.close(); } catch { /* ignore */ }
}
sessionBrokers.clear();
await ipc.close();
try { outboxDb.close(); } catch { /* ignore */ }
try { inboxDb.close(); } catch { /* ignore */ }