diff --git a/apps/cli/src/daemon/ipc/server.ts b/apps/cli/src/daemon/ipc/server.ts index 4490fa4..c757692 100644 --- a/apps/cli/src/daemon/ipc/server.ts +++ b/apps/cli/src/daemon/ipc/server.ts @@ -228,12 +228,55 @@ function makeHandler(opts: { const groups = Array.isArray(body.groups) ? body.groups.filter((g): g is string => typeof g === "string") : undefined; + + // 1.30.0 — optional per-session presence material. Older CLIs + // omit this; the daemon's session-broker subsystem just won't + // open a per-session WS for those. + let presence: SessionInfo["presence"] | undefined; + const rawPresence = body.presence; + if (rawPresence && typeof rawPresence === "object") { + const p = rawPresence as Record; + const sessionPubkey = typeof p.session_pubkey === "string" ? p.session_pubkey.toLowerCase() : ""; + const sessionSecretKey = typeof p.session_secret_key === "string" ? p.session_secret_key.toLowerCase() : ""; + const att = p.parent_attestation as Record | undefined; + if ( + /^[0-9a-f]{64}$/.test(sessionPubkey) && + /^[0-9a-f]{128}$/.test(sessionSecretKey) && + att && typeof att === "object" && + typeof att.session_pubkey === "string" && + typeof att.parent_member_pubkey === "string" && + typeof att.expires_at === "number" && + typeof att.signature === "string" + ) { + presence = { + sessionPubkey, + sessionSecretKey, + parentAttestation: { + sessionPubkey: (att.session_pubkey as string).toLowerCase(), + parentMemberPubkey: (att.parent_member_pubkey as string).toLowerCase(), + expiresAt: att.expires_at as number, + signature: (att.signature as string).toLowerCase(), + }, + }; + } else { + opts.log("warn", "session_register_presence_malformed", { mesh }); + } + } + const stored = registerSession({ token: token.toLowerCase(), sessionId, mesh, displayName, pid, cwd, role, groups, + ...(presence ? { presence } : {}), + }); + opts.log("info", "session_registered", { + sessionId, mesh, pid, + presence: presence ? "yes" : "no", + }); + respond(res, 200, { + ok: true, + registered_at: stored.registeredAt, + presence_accepted: !!presence, }); - opts.log("info", "session_registered", { sessionId, mesh, pid }); - respond(res, 200, { ok: true, registered_at: stored.registeredAt }); } catch (e) { respond(res, 400, { error: String(e) }); } diff --git a/apps/cli/src/daemon/run.ts b/apps/cli/src/daemon/run.ts index 0849ea5..f4bceaa 100644 --- a/apps/cli/src/daemon/run.ts +++ b/apps/cli/src/daemon/run.ts @@ -4,11 +4,12 @@ import { DAEMON_PATHS } from "./paths.js"; import { acquireSingletonLock, releaseSingletonLock } from "./lock.js"; import { ensureLocalToken } from "./local-token.js"; import { startIpcServer } from "./ipc/server.js"; -import { startReaper } from "./session-registry.js"; +import { setRegistryHooks, startReaper, type SessionInfo } from "./session-registry.js"; import { openSqlite, type SqliteDb } from "./db/sqlite.js"; import { migrateOutbox } from "./db/outbox.js"; import { migrateInbox } from "./db/inbox.js"; import { DaemonBrokerClient } from "./broker.js"; +import { SessionBrokerClient } from "./session-broker.js"; import { startDrainWorker, type DrainHandle } from "./drain.js"; import { handleBrokerPush } from "./inbound.js"; import { EventBus } from "./events.js"; @@ -27,6 +28,18 @@ export interface RunDaemonOptions { clonePolicy?: ClonePolicy; } +/** + * 1.30.0 feature flag. Default OFF for one release cycle so the broker + * side has time to deploy + bake before the daemon starts opening + * per-session WebSockets. Set CLAUDEMESH_SESSION_PRESENCE=0 to disable + * once the flag flips default-on. + */ +function isSessionPresenceEnabled(): boolean { + const v = process.env.CLAUDEMESH_SESSION_PRESENCE; + if (v === undefined || v === "") return false; + return v !== "0" && v.toLowerCase() !== "false" && v.toLowerCase() !== "off"; +} + /** Detect a few common container environments to pick UDS-only by default. */ function detectContainer(): boolean { if (process.env.KUBERNETES_SERVICE_HOST) return true; @@ -154,6 +167,56 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { let drain: DrainHandle | null = null; drain = startDrainWorker({ db: outboxDb, brokers }); + // 1.30.0 — per-session broker presence. Default OFF for one release + // cycle so the broker side bakes before the flag flips. Opt-in via + // CLAUDEMESH_SESSION_PRESENCE=1; flips to default-on in 1.30.0 GA. + const sessionPresenceEnabled = isSessionPresenceEnabled(); + const sessionBrokers = new Map(); + setRegistryHooks({ + onRegister: (info) => { + if (!sessionPresenceEnabled) return; + if (!info.presence) return; + const meshConfig = meshConfigs.get(info.mesh); + if (!meshConfig) { + process.stderr.write(JSON.stringify({ + level: "warn", msg: "session_broker_no_mesh_config", mesh: info.mesh, + ts: new Date().toISOString(), + }) + "\n"); + return; + } + // Drop any pre-existing session WS under this token (re-register). + const prior = sessionBrokers.get(info.token); + if (prior) { + sessionBrokers.delete(info.token); + prior.close().catch(() => { /* ignore */ }); + } + const client = new SessionBrokerClient({ + mesh: meshConfig, + sessionPubkey: info.presence.sessionPubkey, + sessionSecretKey: info.presence.sessionSecretKey, + parentAttestation: info.presence.parentAttestation, + sessionId: info.sessionId, + displayName: info.displayName, + ...(info.role ? { role: info.role } : {}), + ...(info.cwd ? { cwd: info.cwd } : {}), + pid: info.pid, + }); + sessionBrokers.set(info.token, client); + client.connect().catch((err) => + process.stderr.write(JSON.stringify({ + level: "warn", msg: "session_broker_connect_failed", + mesh: info.mesh, err: String(err), ts: new Date().toISOString(), + }) + "\n"), + ); + }, + onDeregister: (info: SessionInfo) => { + const client = sessionBrokers.get(info.token); + if (!client) return; + sessionBrokers.delete(info.token); + client.close().catch(() => { /* ignore */ }); + }, + }); + startReaper(); const ipc = startIpcServer({ @@ -194,6 +257,10 @@ export async function runDaemon(opts: RunDaemonOptions = {}): Promise { for (const b of brokers.values()) { try { await b.close(); } catch { /* ignore */ } } + for (const b of sessionBrokers.values()) { + try { await b.close(); } catch { /* ignore */ } + } + sessionBrokers.clear(); await ipc.close(); try { outboxDb.close(); } catch { /* ignore */ } try { inboxDb.close(); } catch { /* ignore */ } diff --git a/apps/cli/src/daemon/session-broker.ts b/apps/cli/src/daemon/session-broker.ts new file mode 100644 index 0000000..5c03e17 --- /dev/null +++ b/apps/cli/src/daemon/session-broker.ts @@ -0,0 +1,205 @@ +/** + * Per-launch session broker WebSocket. + * + * Owned by the daemon, one per registered session. Holds a long-lived + * presence row on the broker keyed on the session's ephemeral pubkey + * (rather than the parent member's stable pubkey). Sibling sessions — + * two `claudemesh launch` runs in the same cwd — finally see each other + * in `peer list` because their presence rows coexist instead of fighting + * over the same memberPubkey snapshot. + * + * Differences from `DaemonBrokerClient`: + * - Uses session_hello (1.30.0+ broker), with a parent-vouched + * attestation provided at construction time. + * - Does NOT drain the outbox — that stays the parent member-keyed + * DaemonBrokerClient's job. Keeps the responsibility split clean + * and avoids two clients fighting over the same outbox row. + * - Does NOT carry list_peers / state / memory RPCs. This client is + * presence-only (and inbound DM delivery for messages targeted at + * the session pubkey). + * + * Old brokers reply with `unknown_message_type` on session_hello — we + * surface that as a one-shot `error` event and the daemon decides + * whether to fall back. For 1.30.0 we just log + retry; the broker is + * expected to be deployed first. + * + * Spec: .artifacts/specs/2026-05-04-per-session-presence.md. + */ + +import { hostname as osHostname } from "node:os"; +import WebSocket from "ws"; + +import type { JoinedMesh } from "~/services/config/facade.js"; +import { signSessionHello } from "~/services/broker/session-hello-sig.js"; + +export type SessionBrokerStatus = "connecting" | "open" | "closed" | "reconnecting"; + +export interface ParentAttestation { + sessionPubkey: string; + parentMemberPubkey: string; + /** Unix ms. Broker rejects > now+24h or already past. */ + expiresAt: number; + signature: string; +} + +export interface SessionBrokerOptions { + mesh: JoinedMesh; + /** Per-launch ephemeral keypair. */ + sessionPubkey: string; + sessionSecretKey: string; + /** Parent-vouched attestation, signed by mesh.secretKey at launch time. */ + parentAttestation: ParentAttestation; + /** Stable session_id from the launch (used for dedup on the broker). */ + sessionId: string; + /** Display name override for this session. */ + displayName?: string; + /** Initial groups. Format mirrors the regular hello. */ + groups?: Array<{ name: string; role?: string }>; + /** Role tag (informational, not auth-bearing). */ + role?: string; + /** Working directory (informational, surfaced in peer list). */ + cwd?: string; + /** Pid of the launched session (NOT the daemon). */ + pid: number; + onStatusChange?: (s: SessionBrokerStatus) => void; + log?: (level: "info" | "warn" | "error", msg: string, meta?: Record) => void; +} + +const HELLO_ACK_TIMEOUT_MS = 5_000; +const BACKOFF_CAPS_MS = [1_000, 2_000, 4_000, 8_000, 16_000, 30_000]; + +export class SessionBrokerClient { + private ws: WebSocket | null = null; + private _status: SessionBrokerStatus = "closed"; + private closed = false; + private reconnectAttempt = 0; + private reconnectTimer: NodeJS.Timeout | null = null; + private helloTimer: NodeJS.Timeout | null = null; + + constructor(private opts: SessionBrokerOptions) {} + + get status(): SessionBrokerStatus { return this._status; } + get meshSlug(): string { return this.opts.mesh.slug; } + get sessionPubkey(): string { return this.opts.sessionPubkey; } + + private log = (level: "info" | "warn" | "error", msg: string, meta?: Record) => { + (this.opts.log ?? defaultLog)(level, msg, { + mesh: this.opts.mesh.slug, + session_pubkey: this.opts.sessionPubkey.slice(0, 12), + ...meta, + }); + }; + + private setStatus(s: SessionBrokerStatus) { + if (this._status === s) return; + this._status = s; + this.opts.onStatusChange?.(s); + } + + /** Open the WS, run session_hello, resolve once the broker accepts. */ + async connect(): Promise { + if (this.closed) throw new Error("client_closed"); + if (this._status === "connecting" || this._status === "open") return; + this.setStatus("connecting"); + + const ws = new WebSocket(this.opts.mesh.brokerUrl); + this.ws = ws; + + return new Promise((resolve, reject) => { + ws.on("open", async () => { + try { + const { timestamp, signature } = await signSessionHello({ + meshId: this.opts.mesh.meshId, + parentMemberPubkey: this.opts.mesh.pubkey, + sessionPubkey: this.opts.sessionPubkey, + sessionSecretKey: this.opts.sessionSecretKey, + }); + ws.send(JSON.stringify({ + type: "session_hello", + meshId: this.opts.mesh.meshId, + parentMemberId: this.opts.mesh.memberId, + parentMemberPubkey: this.opts.mesh.pubkey, + sessionPubkey: this.opts.sessionPubkey, + parentAttestation: this.opts.parentAttestation, + displayName: this.opts.displayName, + sessionId: this.opts.sessionId, + pid: this.opts.pid, + cwd: this.opts.cwd ?? process.cwd(), + hostname: osHostname(), + peerType: "ai" as const, + channel: "claudemesh-session", + ...(this.opts.groups && this.opts.groups.length > 0 ? { groups: this.opts.groups } : {}), + ...(this.opts.role ? { role: this.opts.role } : {}), + timestamp, + signature, + })); + this.helloTimer = setTimeout(() => { + this.log("warn", "session_hello_ack_timeout"); + try { ws.close(); } catch { /* ignore */ } + reject(new Error("session_hello_ack_timeout")); + }, HELLO_ACK_TIMEOUT_MS); + } catch (e) { + reject(e instanceof Error ? e : new Error(String(e))); + } + }); + + ws.on("message", (raw) => { + let msg: Record; + try { msg = JSON.parse(raw.toString()) as Record; } + catch { return; } + + if (msg.type === "hello_ack") { + if (this.helloTimer) clearTimeout(this.helloTimer); + this.helloTimer = null; + this.setStatus("open"); + this.reconnectAttempt = 0; + resolve(); + return; + } + + if (msg.type === "error") { + // Older brokers respond with `unknown_message_type` to session_hello; + // surface that so the daemon can decide to skip per-session presence + // rather than churn through reconnects. + this.log("warn", "broker_error", { code: msg.code, message: msg.message }); + if (msg.code === "unknown_message_type") { + this.closed = true; + } + return; + } + // push / inbound — presence-only client ignores them; the daemon's + // member-keyed client handles all DM decryption. + }); + + ws.on("close", (code, reason) => { + if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } + if (this.closed) { this.setStatus("closed"); return; } + this.setStatus("reconnecting"); + const wait = BACKOFF_CAPS_MS[Math.min(this.reconnectAttempt, BACKOFF_CAPS_MS.length - 1)] ?? 30_000; + this.reconnectAttempt++; + this.log("info", "session_broker_reconnect_scheduled", { wait_ms: wait, code, reason: reason.toString("utf8") }); + this.reconnectTimer = setTimeout( + () => this.connect().catch((err) => this.log("warn", "session_broker_reconnect_failed", { err: String(err) })), + wait, + ); + if (this._status === "connecting") reject(new Error(`closed_before_hello_${code}`)); + }); + + ws.on("error", (err) => this.log("warn", "session_broker_ws_error", { err: err.message })); + }); + } + + async close(): Promise { + this.closed = true; + if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } + if (this.helloTimer) { clearTimeout(this.helloTimer); this.helloTimer = null; } + try { this.ws?.close(); } catch { /* ignore */ } + this.setStatus("closed"); + } +} + +function defaultLog(level: "info" | "warn" | "error", msg: string, meta?: Record) { + const line = JSON.stringify({ level, msg, ...meta, ts: new Date().toISOString() }); + if (level === "info") process.stdout.write(line + "\n"); + else process.stderr.write(line + "\n"); +} diff --git a/apps/cli/src/daemon/session-registry.ts b/apps/cli/src/daemon/session-registry.ts index 948ab5f..cb4a58e 100644 --- a/apps/cli/src/daemon/session-registry.ts +++ b/apps/cli/src/daemon/session-registry.ts @@ -20,6 +20,26 @@ * session have no token to begin with. */ +/** + * Optional per-launch presence material. Carried opaquely through the + * registry; the daemon's session-broker subsystem (1.30.0+) reads it to + * open a long-lived broker WebSocket per session. Absent on older CLIs + * — register accepts payloads without it for backward compat. + */ +export interface SessionPresence { + /** Hex ed25519 pubkey, 64 chars. */ + sessionPubkey: string; + /** Hex ed25519 secret key (held in-memory only; never disk). */ + sessionSecretKey: string; + /** Parent-member-signed attestation; see signParentAttestation. */ + parentAttestation: { + sessionPubkey: string; + parentMemberPubkey: string; + expiresAt: number; + signature: string; + }; +} + export interface SessionInfo { token: string; sessionId: string; @@ -29,14 +49,23 @@ export interface SessionInfo { cwd?: string; role?: string; groups?: string[]; + /** 1.30.0+: per-launch presence material. */ + presence?: SessionPresence; registeredAt: number; } +/** Lifecycle callbacks invoked synchronously after registry mutation. */ +export interface RegistryHooks { + onRegister?: (info: SessionInfo) => void; + onDeregister?: (info: SessionInfo) => void; +} + const TTL_MS = 24 * 60 * 60 * 1000; const REAPER_INTERVAL_MS = 30 * 1000; const byToken = new Map(); const bySessionId = new Map(); +const hooks: RegistryHooks = {}; let reaperHandle: NodeJS.Timeout | null = null; @@ -49,14 +78,30 @@ export function stopReaper(): void { if (reaperHandle) { clearInterval(reaperHandle); reaperHandle = null; } } +/** + * Wire daemon-level lifecycle hooks. Called once at daemon boot — passing + * `{}` clears them. Idempotent across calls so tests can re-bind. + */ +export function setRegistryHooks(next: RegistryHooks): void { + hooks.onRegister = next.onRegister; + hooks.onDeregister = next.onDeregister; +} + export function registerSession(info: Omit): SessionInfo { // Replace any prior entry under the same sessionId. const priorToken = bySessionId.get(info.sessionId); - if (priorToken && priorToken !== info.token) byToken.delete(priorToken); + if (priorToken && priorToken !== info.token) { + const prior = byToken.get(priorToken); + if (prior) { + byToken.delete(priorToken); + try { hooks.onDeregister?.(prior); } catch { /* hook errors must never throttle the registry */ } + } + } const stored: SessionInfo = { ...info, registeredAt: Date.now() }; byToken.set(info.token, stored); bySessionId.set(info.sessionId, info.token); + try { hooks.onRegister?.(stored); } catch { /* see above */ } return stored; } @@ -65,6 +110,7 @@ export function deregisterByToken(token: string): boolean { if (!entry) return false; byToken.delete(token); if (bySessionId.get(entry.sessionId) === token) bySessionId.delete(entry.sessionId); + try { hooks.onDeregister?.(entry); } catch { /* see above */ } return true; } @@ -95,4 +141,6 @@ function reapDead(): void { export function _resetRegistry(): void { byToken.clear(); bySessionId.clear(); + hooks.onRegister = undefined; + hooks.onDeregister = undefined; } diff --git a/apps/cli/src/services/broker/session-hello-sig.ts b/apps/cli/src/services/broker/session-hello-sig.ts new file mode 100644 index 0000000..e223fb2 --- /dev/null +++ b/apps/cli/src/services/broker/session-hello-sig.ts @@ -0,0 +1,72 @@ +/** + * CLI-side helpers for the per-session attestation flow. + * + * Two pieces: + * 1. `signParentAttestation` — `claudemesh launch` calls this with the + * member's stable secret key to mint a long-lived (≤24h) token that + * vouches for an ephemeral session pubkey. The attestation travels + * with the session-token registration to the daemon. + * 2. `signSessionHello` — the daemon's `SessionBrokerClient` calls this + * on every WS-connect to sign the canonical session-hello bytes with + * the session secret key (proves liveness + possession). + * + * Both formats mirror the broker's `canonicalSessionAttestation` / + * `canonicalSessionHello`. Drift will surface as `bad_signature` from + * the broker, never silent breakage. + */ + +import { ensureSodium } from "~/services/crypto/keypair.js"; + +/** Default attestation lifetime — 12h leaves headroom under broker's 24h cap. */ +export const DEFAULT_ATTESTATION_TTL_MS = 12 * 60 * 60 * 1000; + +export interface ParentAttestation { + sessionPubkey: string; + parentMemberPubkey: string; + expiresAt: number; + signature: string; +} + +/** Sign the parent-vouches-session attestation. */ +export async function signParentAttestation(args: { + parentMemberPubkey: string; + parentSecretKey: string; + sessionPubkey: string; + /** Override the lifetime; default 12h. */ + ttlMs?: number; + /** Override clock for tests. */ + now?: number; +}): Promise { + const s = await ensureSodium(); + const expiresAt = (args.now ?? Date.now()) + (args.ttlMs ?? DEFAULT_ATTESTATION_TTL_MS); + const canonical = `claudemesh-session-attest|${args.parentMemberPubkey}|${args.sessionPubkey}|${expiresAt}`; + const sig = s.crypto_sign_detached( + s.from_string(canonical), + s.from_hex(args.parentSecretKey), + ); + return { + sessionPubkey: args.sessionPubkey, + parentMemberPubkey: args.parentMemberPubkey, + expiresAt, + signature: s.to_hex(sig), + }; +} + +/** Sign the per-WS-connect session-hello bytes. */ +export async function signSessionHello(args: { + meshId: string; + parentMemberPubkey: string; + sessionPubkey: string; + sessionSecretKey: string; + now?: number; +}): Promise<{ timestamp: number; signature: string }> { + const s = await ensureSodium(); + const timestamp = args.now ?? Date.now(); + const canonical = + `claudemesh-session-hello|${args.meshId}|${args.parentMemberPubkey}|${args.sessionPubkey}|${timestamp}`; + const sig = s.crypto_sign_detached( + s.from_string(canonical), + s.from_hex(args.sessionSecretKey), + ); + return { timestamp, signature: s.to_hex(sig) }; +} diff --git a/apps/cli/tests/unit/session-hello-sig.test.ts b/apps/cli/tests/unit/session-hello-sig.test.ts new file mode 100644 index 0000000..1cf6397 --- /dev/null +++ b/apps/cli/tests/unit/session-hello-sig.test.ts @@ -0,0 +1,88 @@ +/** + * CLI-side session-hello signing. + * + * Roundtrip: the signatures we mint with the CLI helpers must match the + * canonical bytes the broker recomputes from the same fields. Drift here + * shows up as `bad_signature` on the broker — easier to catch in unit + * tests than in end-to-end flow. + */ + +import { describe, expect, test } from "vitest"; +import sodium from "libsodium-wrappers"; +import { + signParentAttestation, + signSessionHello, + DEFAULT_ATTESTATION_TTL_MS, +} from "../../src/services/broker/session-hello-sig.js"; + +async function makeKeypair(): Promise<{ publicKey: string; secretKey: string }> { + await sodium.ready; + const kp = sodium.crypto_sign_keypair(); + return { + publicKey: sodium.to_hex(kp.publicKey), + secretKey: sodium.to_hex(kp.privateKey), + }; +} + +describe("signParentAttestation", () => { + test("produces canonical bytes that verify against parent pubkey", async () => { + await sodium.ready; + const parent = await makeKeypair(); + const session = await makeKeypair(); + + const att = await signParentAttestation({ + parentMemberPubkey: parent.publicKey, + parentSecretKey: parent.secretKey, + sessionPubkey: session.publicKey, + }); + expect(att.parentMemberPubkey).toBe(parent.publicKey); + expect(att.sessionPubkey).toBe(session.publicKey); + expect(att.signature).toMatch(/^[0-9a-f]{128}$/); + + const canonical = + `claudemesh-session-attest|${parent.publicKey}|${session.publicKey}|${att.expiresAt}`; + const ok = sodium.crypto_sign_verify_detached( + sodium.from_hex(att.signature), + sodium.from_string(canonical), + sodium.from_hex(parent.publicKey), + ); + expect(ok).toBe(true); + }); + + test("default TTL ≤24h cap", async () => { + const parent = await makeKeypair(); + const session = await makeKeypair(); + const now = 1_700_000_000_000; + const att = await signParentAttestation({ + parentMemberPubkey: parent.publicKey, + parentSecretKey: parent.secretKey, + sessionPubkey: session.publicKey, + now, + }); + expect(att.expiresAt).toBe(now + DEFAULT_ATTESTATION_TTL_MS); + expect(att.expiresAt - now).toBeLessThanOrEqual(24 * 60 * 60 * 1000); + }); +}); + +describe("signSessionHello", () => { + test("signature verifies against session pubkey", async () => { + await sodium.ready; + const session = await makeKeypair(); + const result = await signSessionHello({ + meshId: "mesh-x", + parentMemberPubkey: "c".repeat(64), + sessionPubkey: session.publicKey, + sessionSecretKey: session.secretKey, + }); + expect(result.signature).toMatch(/^[0-9a-f]{128}$/); + + const canonical = + `claudemesh-session-hello|mesh-x|${"c".repeat(64)}|${session.publicKey}|${result.timestamp}`; + const ok = sodium.crypto_sign_verify_detached( + sodium.from_hex(result.signature), + sodium.from_string(canonical), + sodium.from_hex(session.publicKey), + ); + expect(ok).toBe(true); + }); +}); diff --git a/apps/cli/tests/unit/session-registry-hooks.test.ts b/apps/cli/tests/unit/session-registry-hooks.test.ts new file mode 100644 index 0000000..972c21c --- /dev/null +++ b/apps/cli/tests/unit/session-registry-hooks.test.ts @@ -0,0 +1,135 @@ +/** + * Session-registry lifecycle hooks (1.30.0+). + * + * The daemon's session-broker subsystem subscribes to register/deregister + * events to open and close per-session WSes. Verifies: + * - hooks fire on register + deregister + * - replacing an entry under the same sessionId fires deregister(prior) + * followed by register(new) + * - reaper-triggered deregister fires the hook for dead pids + * - presence material round-trips through the registry + */ + +import { afterEach, describe, expect, test, vi } from "vitest"; +import { + _resetRegistry, + deregisterByToken, + registerSession, + resolveToken, + setRegistryHooks, + type SessionInfo, +} from "../../src/daemon/session-registry.js"; + +const PRESENCE = { + sessionPubkey: "a".repeat(64), + sessionSecretKey: "b".repeat(128), + parentAttestation: { + sessionPubkey: "a".repeat(64), + parentMemberPubkey: "c".repeat(64), + expiresAt: Date.now() + 60 * 60 * 1000, + signature: "d".repeat(128), + }, +}; + +afterEach(() => { + _resetRegistry(); +}); + +describe("session-registry hooks", () => { + test("onRegister fires on register", () => { + const onRegister = vi.fn(); + const onDeregister = vi.fn(); + setRegistryHooks({ onRegister, onDeregister }); + + registerSession({ + token: "t".repeat(64), + sessionId: "sess-1", + mesh: "alpha", + displayName: "Alex", + pid: 12345, + presence: PRESENCE, + }); + + expect(onRegister).toHaveBeenCalledTimes(1); + expect(onDeregister).not.toHaveBeenCalled(); + const arg = onRegister.mock.calls[0]![0] as SessionInfo; + expect(arg.sessionId).toBe("sess-1"); + expect(arg.presence).toEqual(PRESENCE); + }); + + test("onDeregister fires on explicit deregister", () => { + const onRegister = vi.fn(); + const onDeregister = vi.fn(); + setRegistryHooks({ onRegister, onDeregister }); + + const token = "e".repeat(64); + registerSession({ + token, sessionId: "sess-2", mesh: "alpha", displayName: "Alex", + pid: 12345, + }); + onRegister.mockClear(); + + const ok = deregisterByToken(token); + expect(ok).toBe(true); + expect(onDeregister).toHaveBeenCalledTimes(1); + const arg = onDeregister.mock.calls[0]![0] as SessionInfo; + expect(arg.sessionId).toBe("sess-2"); + }); + + test("re-registering same sessionId deregisters prior entry first", () => { + const onRegister = vi.fn(); + const onDeregister = vi.fn(); + setRegistryHooks({ onRegister, onDeregister }); + + const oldToken = "1".repeat(64); + const newToken = "2".repeat(64); + registerSession({ + token: oldToken, sessionId: "sess-3", mesh: "alpha", + displayName: "Alex", pid: 12345, + }); + expect(onRegister).toHaveBeenCalledTimes(1); + + // Replace under same sessionId — prior must be torn down before new one. + registerSession({ + token: newToken, sessionId: "sess-3", mesh: "alpha", + displayName: "Alex", pid: 12345, + }); + + expect(onDeregister).toHaveBeenCalledTimes(1); + expect(onRegister).toHaveBeenCalledTimes(2); + expect((onDeregister.mock.calls[0]![0] as SessionInfo).token).toBe(oldToken); + expect((onRegister.mock.calls[1]![0] as SessionInfo).token).toBe(newToken); + // Old token is unresolvable now. + expect(resolveToken(oldToken)).toBeNull(); + expect(resolveToken(newToken)).toBeTruthy(); + }); + + test("hooks tolerate throws (registry mutation still succeeds)", () => { + setRegistryHooks({ + onRegister: () => { throw new Error("boom"); }, + onDeregister: () => { throw new Error("boom"); }, + }); + const token = "f".repeat(64); + expect(() => + registerSession({ + token, sessionId: "sess-4", mesh: "alpha", + displayName: "Alex", pid: 12345, + }), + ).not.toThrow(); + expect(resolveToken(token)).toBeTruthy(); + expect(() => deregisterByToken(token)).not.toThrow(); + expect(resolveToken(token)).toBeNull(); + }); + + test("presence is preserved through resolveToken", () => { + setRegistryHooks({}); + const token = "9".repeat(64); + registerSession({ + token, sessionId: "sess-5", mesh: "alpha", + displayName: "Alex", pid: 12345, presence: PRESENCE, + }); + const got = resolveToken(token); + expect(got).not.toBeNull(); + expect(got!.presence).toEqual(PRESENCE); + }); +});