From e688f66791637ce20ab27b1d9f9d02c710c4fb2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Mon, 4 May 2026 13:00:11 +0100 Subject: [PATCH] feat(broker): session_hello WS handler for per-launch presence MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 1.30.0 daemon-multiplexed presence flow needs a way for the daemon to open a WS keyed on a per-launch ephemeral pubkey. This commit adds: - WSSessionHelloMessage in types.ts (additive — older clients still use WSHelloMessage; older brokers reply with unknown_message_type so newer clients can fall back). - handleSessionHello in index.ts: validates parentAttestation (TTL ≤24h, ed25519 by parent), session signature (skew + ed25519 by session), parent membership in mesh.member, and parentMemberId/pubkey coherence. - Inserts a presence row keyed on sessionPubkey but member_id from the parent — member-targeted operations (revocation, send-by-member-pubkey) keep working unchanged. - Broadcasts peer_joined to ALL siblings in the mesh, including the same-member ones (the regular hello path skips those to avoid self- spam, but session_hello explicitly wants sibling visibility). Behavior parity tests will land alongside the daemon SessionBrokerClient. The unit tests added in the previous commit cover the crypto layer. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/broker/src/index.ts | 263 ++++++++++++++++++++++++++++++++++++++- apps/broker/src/types.ts | 61 +++++++++ 2 files changed, 323 insertions(+), 1 deletion(-) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 75dc963..ba5e43b 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -115,7 +115,7 @@ import { metrics, metricsToText } from "./metrics"; import { TokenBucket } from "./rate-limit"; import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { buildInfo } from "./build-info"; -import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto"; +import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2, verifySessionAttestation, verifySessionHelloSignature } from "./crypto"; // Alias for in-module callers; the public re-export below surfaces the // same symbol without colliding with tests that import from index.ts. const claimInviteV2Core = _claimInviteV2Core; @@ -1821,6 +1821,220 @@ async function handleHello( }; } +/** + * Authenticate + presence-register a per-launch session WebSocket. + * + * Two-stage proof: parent member's pre-signed attestation vouches the + * session pubkey, and the session keypair signs the hello timestamp to + * prove possession. The presence row is keyed on `sessionPubkey` but + * `member_id` points at the parent member, so member-targeted operations + * (revocation, send-by-member-pubkey) keep working unchanged. + * + * Spec: .artifacts/specs/2026-05-04-per-session-presence.md. + */ +async function handleSessionHello( + ws: WebSocket, + hello: Extract, +): Promise<{ + presenceId: string; + memberDisplayName: string; + memberProfile?: unknown; + meshPolicy?: Record; +} | null> { + // Shape checks. The crypto helpers also enforce these but bailing + // early gives a clearer error code on the wire. + if (!/^[0-9a-f]{64}$/.test(hello.sessionPubkey ?? "")) { + metrics.connectionsRejected.inc({ reason: "bad_session_pubkey" }); + sendError(ws, "bad_session_pubkey", "sessionPubkey must be 64 lowercase hex chars"); + ws.close(1008, "bad_session_pubkey"); + return null; + } + if (!/^[0-9a-f]{64}$/.test(hello.parentMemberPubkey ?? "")) { + metrics.connectionsRejected.inc({ reason: "bad_parent_pubkey" }); + sendError(ws, "bad_parent_pubkey", "parentMemberPubkey must be 64 lowercase hex chars"); + ws.close(1008, "bad_parent_pubkey"); + return null; + } + const att = hello.parentAttestation; + if ( + !att || + typeof att !== "object" || + att.sessionPubkey !== hello.sessionPubkey || + att.parentMemberPubkey !== hello.parentMemberPubkey + ) { + metrics.connectionsRejected.inc({ reason: "attestation_mismatch" }); + sendError(ws, "attestation_mismatch", "parentAttestation does not bind the claimed session+parent pubkeys"); + ws.close(1008, "attestation_mismatch"); + return null; + } + + // Capacity check BEFORE touching DB. + const existing = connectionsPerMesh.get(hello.meshId) ?? 0; + if (existing >= env.MAX_CONNECTIONS_PER_MESH) { + metrics.connectionsRejected.inc({ reason: "capacity" }); + log.warn("mesh at capacity (session_hello)", { + mesh_id: hello.meshId, + existing, + cap: env.MAX_CONNECTIONS_PER_MESH, + }); + sendError(ws, "capacity", "mesh at connection capacity"); + ws.close(1008, "capacity"); + return null; + } + + // 1. Parent attestation: TTL bounds + signature against parent pubkey. + const attCheck = await verifySessionAttestation({ + parentMemberPubkey: hello.parentMemberPubkey, + sessionPubkey: hello.sessionPubkey, + expiresAt: att.expiresAt, + signature: att.signature, + }); + if (!attCheck.ok) { + metrics.connectionsRejected.inc({ reason: `attestation_${attCheck.reason}` }); + log.warn("session_hello attestation rejected", { + reason: attCheck.reason, + mesh_id: hello.meshId, + parent_pubkey: hello.parentMemberPubkey.slice(0, 12), + session_pubkey: hello.sessionPubkey.slice(0, 12), + }); + sendError(ws, attCheck.reason, `attestation rejected: ${attCheck.reason}`); + ws.close(1008, attCheck.reason); + return null; + } + + // 2. Session signature: timestamp skew + ed25519 against sessionPubkey. + const sigCheck = await verifySessionHelloSignature({ + meshId: hello.meshId, + parentMemberPubkey: hello.parentMemberPubkey, + sessionPubkey: hello.sessionPubkey, + timestamp: hello.timestamp, + signature: hello.signature, + }); + if (!sigCheck.ok) { + metrics.connectionsRejected.inc({ reason: `session_${sigCheck.reason}` }); + log.warn("session_hello sig rejected", { + reason: sigCheck.reason, + mesh_id: hello.meshId, + session_pubkey: hello.sessionPubkey.slice(0, 12), + }); + sendError(ws, sigCheck.reason, `session_hello rejected: ${sigCheck.reason}`); + ws.close(1008, sigCheck.reason); + return null; + } + + // 3. Parent member must exist + be active in the claimed mesh. + const member = await findMemberByPubkey(hello.meshId, hello.parentMemberPubkey); + if (!member) { + const [revokedRow] = await db + .select({ displayName: meshMember.displayName, revokedAt: meshMember.revokedAt }) + .from(meshMember) + .where(and(eq(meshMember.meshId, hello.meshId), eq(meshMember.peerPubkey, hello.parentMemberPubkey))) + .limit(1); + if (revokedRow?.revokedAt) { + metrics.connectionsRejected.inc({ reason: "revoked" }); + const [m] = await db.select({ slug: mesh.slug, name: mesh.name }).from(mesh).where(eq(mesh.id, hello.meshId)).limit(1); + const meshLabel = m?.name || m?.slug || hello.meshId; + sendError( + ws, + "revoked", + `You've been removed from "${meshLabel}". Contact the mesh owner to rejoin.`, + ); + ws.close(4002, "banned"); + log.info("session_hello rejected: revoked parent", { mesh_id: hello.meshId, display_name: revokedRow.displayName }); + return null; + } + metrics.connectionsRejected.inc({ reason: "unauthorized" }); + sendError(ws, "unauthorized", "parent pubkey not found in mesh"); + ws.close(1008, "unauthorized"); + return null; + } + // The parentMemberId in the hello must match the member we resolved by + // pubkey — otherwise the daemon claims membership it doesn't have. + if (hello.parentMemberId && hello.parentMemberId !== member.id) { + metrics.connectionsRejected.inc({ reason: "parent_member_id_mismatch" }); + sendError(ws, "parent_member_id_mismatch", "parentMemberId does not match parentMemberPubkey"); + ws.close(1008, "parent_member_id_mismatch"); + return null; + } + + // Load mesh policy (best-effort; non-fatal). + let meshPolicy: Record | undefined; + try { + const [m] = await db + .select({ selfEditable: mesh.selfEditable }) + .from(mesh) + .where(eq(mesh.id, hello.meshId)); + if (m?.selfEditable) meshPolicy = { selfEditable: m.selfEditable }; + } catch { /* non-fatal */ } + + const initialGroups = hello.groups ?? member.defaultGroups ?? []; + + // Session-id dedup: if the same session_id is already connected, kick + // the ghost. Reconnect after a network blip lands here cleanly. + for (const [oldPid, oldConn] of connections) { + if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) { + log.info("session_hello dedup", { old_presence: oldPid, session_id: hello.sessionId }); + try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ } + connections.delete(oldPid); + void disconnectPresence(oldPid); + } + } + + const presenceId = await connectPresence({ + memberId: member.id, + sessionId: hello.sessionId, + sessionPubkey: hello.sessionPubkey, + displayName: hello.displayName, + pid: hello.pid, + cwd: hello.cwd, + groups: initialGroups, + }); + const effectiveDisplayName = hello.displayName || member.displayName; + connections.set(presenceId, { + ws, + meshId: hello.meshId, + memberId: member.id, + memberPubkey: hello.parentMemberPubkey, + sessionId: hello.sessionId, + sessionPubkey: hello.sessionPubkey, + displayName: effectiveDisplayName, + cwd: hello.cwd, + hostname: hello.hostname, + peerType: hello.peerType, + channel: hello.channel, + model: hello.model, + groups: initialGroups, + visible: true, + profile: {}, + }); + incMeshCount(hello.meshId); + void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, { + pubkey: hello.parentMemberPubkey, + session_pubkey: hello.sessionPubkey, + groups: initialGroups, + via: "session_hello", + }); + log.info("ws session_hello", { + mesh_id: hello.meshId, + member: effectiveDisplayName, + presence_id: presenceId, + session_id: hello.sessionId, + session_pubkey: hello.sessionPubkey.slice(0, 12), + }); + // Drain any DMs queued for this session pubkey (or the parent member). + void maybePushQueuedMessages(presenceId); + return { + presenceId, + memberDisplayName: effectiveDisplayName, + memberProfile: { + roleTag: member.roleTag, + groups: member.defaultGroups ?? [], + messageMode: member.messageMode ?? "push", + }, + meshPolicy, + }; +} + async function handleSend( conn: PeerConn, msg: Extract, @@ -2171,6 +2385,53 @@ function handleConnection(ws: WebSocket): void { try { const msg = JSON.parse(raw.toString()) as WSClientMessage; const _reqId = (msg as any)._reqId as string | undefined; + if (msg.type === "session_hello") { + const result = await handleSessionHello(ws, msg); + if (!result) return; + presenceId = result.presenceId; + try { + const ackPayload: Record = { + type: "hello_ack", + presenceId: result.presenceId, + memberDisplayName: result.memberDisplayName, + memberProfile: result.memberProfile, + ...(result.meshPolicy ? { meshPolicy: result.meshPolicy } : {}), + }; + ws.send(JSON.stringify(ackPayload)); + } catch { + /* ws closed during hello */ + } + // Broadcast peer_joined to siblings — same shape as the regular + // hello path, so list_peers consumers don't need to special-case. + const joinedConn = connections.get(presenceId); + if (joinedConn) { + const joinMsg: WSPushMessage = { + type: "push", + subtype: "system", + event: "peer_joined", + eventData: { + name: result.memberDisplayName, + pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey, + groups: joinedConn.groups, + }, + messageId: crypto.randomUUID(), + meshId: joinedConn.meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (pid === presenceId) continue; + if (peer.meshId !== joinedConn.meshId) continue; + // Same-member sibling sessions get the join — a per-launch + // session is meant to be visible to the user's other launches. + sendToPeer(pid, joinMsg); + } + } + return; + } if (msg.type === "hello") { const result = await handleHello(ws, msg); if (!result) return; diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index ca43a64..553beba 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -90,6 +90,66 @@ export interface WSHelloMessage { signature: string; } +/** + * Client → broker: per-launch session hello, vouched by the parent member. + * + * Used by the daemon's per-session WebSocket connections (1.30.0+) so that + * each `claudemesh launch`-spawned session has its own long-lived presence + * row owned by an ephemeral session keypair. The parent member key vouches + * (out-of-band) that the session pubkey is theirs; the session keypair + * proves liveness on every connect. + * + * Two-stage proof: + * 1. `parentAttestation.signature` — ed25519 over + * `claudemesh-session-attest|||` + * signed by the parent member's stable secret key. TTL ≤ 24h. + * 2. `signature` — ed25519 over + * `claudemesh-session-hello||||` + * signed by the session secret key (held by the daemon for the + * lifetime of the session registration). + * + * Older brokers don't recognize this message type and reply with + * `unknown_message_type`; clients fall back to the legacy `hello` flow. + */ +export interface WSSessionHelloMessage { + type: "session_hello"; + /** Highest WS protocol version the client understands. */ + protocolVersion?: number; + /** Optional feature strings the client supports. */ + capabilities?: string[]; + meshId: string; + /** Parent member's id (mesh.member.id) — used for revocation lookup. */ + parentMemberId: string; + /** Parent member's stable ed25519 pubkey (hex), as found in mesh.member. */ + parentMemberPubkey: string; + /** Per-launch ephemeral ed25519 pubkey (hex). Routes presence + DMs. */ + sessionPubkey: string; + /** Pre-signed attestation by the parent member, presented per session. */ + parentAttestation: { + sessionPubkey: string; + parentMemberPubkey: string; + /** Unix ms; broker rejects past or > now+24h. */ + expiresAt: number; + signature: string; + }; + /** Display name override for this session (optional, falls back to member). */ + displayName?: string; + sessionId: string; + pid: number; + cwd: string; + hostname?: string; + peerType?: "ai" | "human" | "connector"; + channel?: string; + model?: string; + groups?: Array<{ name: string; role?: string }>; + /** Initial role tag for the session. */ + role?: string; + /** ms epoch; broker rejects if outside ±60s of its own clock. */ + timestamp: number; + /** ed25519 signature (hex) by the SESSION secret key over canonical bytes. */ + signature: string; +} + /** Client → broker: send an E2E-encrypted envelope to a target. */ export interface WSSendMessage { type: "send"; @@ -1341,6 +1401,7 @@ export interface WSWatchTriggeredMessage { type: "watch_triggered"; watchId: str export type WSClientMessage = | WSHelloMessage + | WSSessionHelloMessage | WSSendMessage | WSSetStatusMessage | WSListPeersMessage