feat(broker): session_hello WS handler for per-launch presence

The 1.30.0 daemon-multiplexed presence flow needs a way for the daemon
to open a WS keyed on a per-launch ephemeral pubkey. This commit adds:

- WSSessionHelloMessage in types.ts (additive — older clients still use
  WSHelloMessage; older brokers reply with unknown_message_type so newer
  clients can fall back).
- handleSessionHello in index.ts: validates parentAttestation (TTL ≤24h,
  ed25519 by parent), session signature (skew + ed25519 by session),
  parent membership in mesh.member, and parentMemberId/pubkey coherence.
- Inserts a presence row keyed on sessionPubkey but member_id from the
  parent — member-targeted operations (revocation, send-by-member-pubkey)
  keep working unchanged.
- Broadcasts peer_joined to ALL siblings in the mesh, including the
  same-member ones (the regular hello path skips those to avoid self-
  spam, but session_hello explicitly wants sibling visibility).

Behavior parity tests will land alongside the daemon SessionBrokerClient.
The unit tests added in the previous commit cover the crypto layer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 13:00:11 +01:00
parent 033a2d37e1
commit e688f66791
2 changed files with 323 additions and 1 deletions

View File

@@ -115,7 +115,7 @@ import { metrics, metricsToText } from "./metrics";
import { TokenBucket } from "./rate-limit"; import { TokenBucket } from "./rate-limit";
import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
import { buildInfo } from "./build-info"; import { buildInfo } from "./build-info";
import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto"; import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2, verifySessionAttestation, verifySessionHelloSignature } from "./crypto";
// Alias for in-module callers; the public re-export below surfaces the // Alias for in-module callers; the public re-export below surfaces the
// same symbol without colliding with tests that import from index.ts. // same symbol without colliding with tests that import from index.ts.
const claimInviteV2Core = _claimInviteV2Core; const claimInviteV2Core = _claimInviteV2Core;
@@ -1821,6 +1821,220 @@ async function handleHello(
}; };
} }
/**
* Authenticate + presence-register a per-launch session WebSocket.
*
* Two-stage proof: parent member's pre-signed attestation vouches the
* session pubkey, and the session keypair signs the hello timestamp to
* prove possession. The presence row is keyed on `sessionPubkey` but
* `member_id` points at the parent member, so member-targeted operations
* (revocation, send-by-member-pubkey) keep working unchanged.
*
* Spec: .artifacts/specs/2026-05-04-per-session-presence.md.
*/
async function handleSessionHello(
ws: WebSocket,
hello: Extract<WSClientMessage, { type: "session_hello" }>,
): Promise<{
presenceId: string;
memberDisplayName: string;
memberProfile?: unknown;
meshPolicy?: Record<string, unknown>;
} | null> {
// Shape checks. The crypto helpers also enforce these but bailing
// early gives a clearer error code on the wire.
if (!/^[0-9a-f]{64}$/.test(hello.sessionPubkey ?? "")) {
metrics.connectionsRejected.inc({ reason: "bad_session_pubkey" });
sendError(ws, "bad_session_pubkey", "sessionPubkey must be 64 lowercase hex chars");
ws.close(1008, "bad_session_pubkey");
return null;
}
if (!/^[0-9a-f]{64}$/.test(hello.parentMemberPubkey ?? "")) {
metrics.connectionsRejected.inc({ reason: "bad_parent_pubkey" });
sendError(ws, "bad_parent_pubkey", "parentMemberPubkey must be 64 lowercase hex chars");
ws.close(1008, "bad_parent_pubkey");
return null;
}
const att = hello.parentAttestation;
if (
!att ||
typeof att !== "object" ||
att.sessionPubkey !== hello.sessionPubkey ||
att.parentMemberPubkey !== hello.parentMemberPubkey
) {
metrics.connectionsRejected.inc({ reason: "attestation_mismatch" });
sendError(ws, "attestation_mismatch", "parentAttestation does not bind the claimed session+parent pubkeys");
ws.close(1008, "attestation_mismatch");
return null;
}
// Capacity check BEFORE touching DB.
const existing = connectionsPerMesh.get(hello.meshId) ?? 0;
if (existing >= env.MAX_CONNECTIONS_PER_MESH) {
metrics.connectionsRejected.inc({ reason: "capacity" });
log.warn("mesh at capacity (session_hello)", {
mesh_id: hello.meshId,
existing,
cap: env.MAX_CONNECTIONS_PER_MESH,
});
sendError(ws, "capacity", "mesh at connection capacity");
ws.close(1008, "capacity");
return null;
}
// 1. Parent attestation: TTL bounds + signature against parent pubkey.
const attCheck = await verifySessionAttestation({
parentMemberPubkey: hello.parentMemberPubkey,
sessionPubkey: hello.sessionPubkey,
expiresAt: att.expiresAt,
signature: att.signature,
});
if (!attCheck.ok) {
metrics.connectionsRejected.inc({ reason: `attestation_${attCheck.reason}` });
log.warn("session_hello attestation rejected", {
reason: attCheck.reason,
mesh_id: hello.meshId,
parent_pubkey: hello.parentMemberPubkey.slice(0, 12),
session_pubkey: hello.sessionPubkey.slice(0, 12),
});
sendError(ws, attCheck.reason, `attestation rejected: ${attCheck.reason}`);
ws.close(1008, attCheck.reason);
return null;
}
// 2. Session signature: timestamp skew + ed25519 against sessionPubkey.
const sigCheck = await verifySessionHelloSignature({
meshId: hello.meshId,
parentMemberPubkey: hello.parentMemberPubkey,
sessionPubkey: hello.sessionPubkey,
timestamp: hello.timestamp,
signature: hello.signature,
});
if (!sigCheck.ok) {
metrics.connectionsRejected.inc({ reason: `session_${sigCheck.reason}` });
log.warn("session_hello sig rejected", {
reason: sigCheck.reason,
mesh_id: hello.meshId,
session_pubkey: hello.sessionPubkey.slice(0, 12),
});
sendError(ws, sigCheck.reason, `session_hello rejected: ${sigCheck.reason}`);
ws.close(1008, sigCheck.reason);
return null;
}
// 3. Parent member must exist + be active in the claimed mesh.
const member = await findMemberByPubkey(hello.meshId, hello.parentMemberPubkey);
if (!member) {
const [revokedRow] = await db
.select({ displayName: meshMember.displayName, revokedAt: meshMember.revokedAt })
.from(meshMember)
.where(and(eq(meshMember.meshId, hello.meshId), eq(meshMember.peerPubkey, hello.parentMemberPubkey)))
.limit(1);
if (revokedRow?.revokedAt) {
metrics.connectionsRejected.inc({ reason: "revoked" });
const [m] = await db.select({ slug: mesh.slug, name: mesh.name }).from(mesh).where(eq(mesh.id, hello.meshId)).limit(1);
const meshLabel = m?.name || m?.slug || hello.meshId;
sendError(
ws,
"revoked",
`You've been removed from "${meshLabel}". Contact the mesh owner to rejoin.`,
);
ws.close(4002, "banned");
log.info("session_hello rejected: revoked parent", { mesh_id: hello.meshId, display_name: revokedRow.displayName });
return null;
}
metrics.connectionsRejected.inc({ reason: "unauthorized" });
sendError(ws, "unauthorized", "parent pubkey not found in mesh");
ws.close(1008, "unauthorized");
return null;
}
// The parentMemberId in the hello must match the member we resolved by
// pubkey — otherwise the daemon claims membership it doesn't have.
if (hello.parentMemberId && hello.parentMemberId !== member.id) {
metrics.connectionsRejected.inc({ reason: "parent_member_id_mismatch" });
sendError(ws, "parent_member_id_mismatch", "parentMemberId does not match parentMemberPubkey");
ws.close(1008, "parent_member_id_mismatch");
return null;
}
// Load mesh policy (best-effort; non-fatal).
let meshPolicy: Record<string, unknown> | undefined;
try {
const [m] = await db
.select({ selfEditable: mesh.selfEditable })
.from(mesh)
.where(eq(mesh.id, hello.meshId));
if (m?.selfEditable) meshPolicy = { selfEditable: m.selfEditable };
} catch { /* non-fatal */ }
const initialGroups = hello.groups ?? member.defaultGroups ?? [];
// Session-id dedup: if the same session_id is already connected, kick
// the ghost. Reconnect after a network blip lands here cleanly.
for (const [oldPid, oldConn] of connections) {
if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) {
log.info("session_hello dedup", { old_presence: oldPid, session_id: hello.sessionId });
try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ }
connections.delete(oldPid);
void disconnectPresence(oldPid);
}
}
const presenceId = await connectPresence({
memberId: member.id,
sessionId: hello.sessionId,
sessionPubkey: hello.sessionPubkey,
displayName: hello.displayName,
pid: hello.pid,
cwd: hello.cwd,
groups: initialGroups,
});
const effectiveDisplayName = hello.displayName || member.displayName;
connections.set(presenceId, {
ws,
meshId: hello.meshId,
memberId: member.id,
memberPubkey: hello.parentMemberPubkey,
sessionId: hello.sessionId,
sessionPubkey: hello.sessionPubkey,
displayName: effectiveDisplayName,
cwd: hello.cwd,
hostname: hello.hostname,
peerType: hello.peerType,
channel: hello.channel,
model: hello.model,
groups: initialGroups,
visible: true,
profile: {},
});
incMeshCount(hello.meshId);
void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, {
pubkey: hello.parentMemberPubkey,
session_pubkey: hello.sessionPubkey,
groups: initialGroups,
via: "session_hello",
});
log.info("ws session_hello", {
mesh_id: hello.meshId,
member: effectiveDisplayName,
presence_id: presenceId,
session_id: hello.sessionId,
session_pubkey: hello.sessionPubkey.slice(0, 12),
});
// Drain any DMs queued for this session pubkey (or the parent member).
void maybePushQueuedMessages(presenceId);
return {
presenceId,
memberDisplayName: effectiveDisplayName,
memberProfile: {
roleTag: member.roleTag,
groups: member.defaultGroups ?? [],
messageMode: member.messageMode ?? "push",
},
meshPolicy,
};
}
async function handleSend( async function handleSend(
conn: PeerConn, conn: PeerConn,
msg: Extract<WSClientMessage, { type: "send" }>, msg: Extract<WSClientMessage, { type: "send" }>,
@@ -2171,6 +2385,53 @@ function handleConnection(ws: WebSocket): void {
try { try {
const msg = JSON.parse(raw.toString()) as WSClientMessage; const msg = JSON.parse(raw.toString()) as WSClientMessage;
const _reqId = (msg as any)._reqId as string | undefined; const _reqId = (msg as any)._reqId as string | undefined;
if (msg.type === "session_hello") {
const result = await handleSessionHello(ws, msg);
if (!result) return;
presenceId = result.presenceId;
try {
const ackPayload: Record<string, unknown> = {
type: "hello_ack",
presenceId: result.presenceId,
memberDisplayName: result.memberDisplayName,
memberProfile: result.memberProfile,
...(result.meshPolicy ? { meshPolicy: result.meshPolicy } : {}),
};
ws.send(JSON.stringify(ackPayload));
} catch {
/* ws closed during hello */
}
// Broadcast peer_joined to siblings — same shape as the regular
// hello path, so list_peers consumers don't need to special-case.
const joinedConn = connections.get(presenceId);
if (joinedConn) {
const joinMsg: WSPushMessage = {
type: "push",
subtype: "system",
event: "peer_joined",
eventData: {
name: result.memberDisplayName,
pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey,
groups: joinedConn.groups,
},
messageId: crypto.randomUUID(),
meshId: joinedConn.meshId,
senderPubkey: "system",
priority: "low",
nonce: "",
ciphertext: "",
createdAt: new Date().toISOString(),
};
for (const [pid, peer] of connections) {
if (pid === presenceId) continue;
if (peer.meshId !== joinedConn.meshId) continue;
// Same-member sibling sessions get the join — a per-launch
// session is meant to be visible to the user's other launches.
sendToPeer(pid, joinMsg);
}
}
return;
}
if (msg.type === "hello") { if (msg.type === "hello") {
const result = await handleHello(ws, msg); const result = await handleHello(ws, msg);
if (!result) return; if (!result) return;

View File

@@ -90,6 +90,66 @@ export interface WSHelloMessage {
signature: string; signature: string;
} }
/**
* Client → broker: per-launch session hello, vouched by the parent member.
*
* Used by the daemon's per-session WebSocket connections (1.30.0+) so that
* each `claudemesh launch`-spawned session has its own long-lived presence
* row owned by an ephemeral session keypair. The parent member key vouches
* (out-of-band) that the session pubkey is theirs; the session keypair
* proves liveness on every connect.
*
* Two-stage proof:
* 1. `parentAttestation.signature` — ed25519 over
* `claudemesh-session-attest|<parent_pubkey>|<session_pubkey>|<expires_at_ms>`
* signed by the parent member's stable secret key. TTL ≤ 24h.
* 2. `signature` — ed25519 over
* `claudemesh-session-hello|<mesh_id>|<parent_pubkey>|<session_pubkey>|<timestamp>`
* signed by the session secret key (held by the daemon for the
* lifetime of the session registration).
*
* Older brokers don't recognize this message type and reply with
* `unknown_message_type`; clients fall back to the legacy `hello` flow.
*/
export interface WSSessionHelloMessage {
type: "session_hello";
/** Highest WS protocol version the client understands. */
protocolVersion?: number;
/** Optional feature strings the client supports. */
capabilities?: string[];
meshId: string;
/** Parent member's id (mesh.member.id) — used for revocation lookup. */
parentMemberId: string;
/** Parent member's stable ed25519 pubkey (hex), as found in mesh.member. */
parentMemberPubkey: string;
/** Per-launch ephemeral ed25519 pubkey (hex). Routes presence + DMs. */
sessionPubkey: string;
/** Pre-signed attestation by the parent member, presented per session. */
parentAttestation: {
sessionPubkey: string;
parentMemberPubkey: string;
/** Unix ms; broker rejects past or > now+24h. */
expiresAt: number;
signature: string;
};
/** Display name override for this session (optional, falls back to member). */
displayName?: string;
sessionId: string;
pid: number;
cwd: string;
hostname?: string;
peerType?: "ai" | "human" | "connector";
channel?: string;
model?: string;
groups?: Array<{ name: string; role?: string }>;
/** Initial role tag for the session. */
role?: string;
/** ms epoch; broker rejects if outside ±60s of its own clock. */
timestamp: number;
/** ed25519 signature (hex) by the SESSION secret key over canonical bytes. */
signature: string;
}
/** Client → broker: send an E2E-encrypted envelope to a target. */ /** Client → broker: send an E2E-encrypted envelope to a target. */
export interface WSSendMessage { export interface WSSendMessage {
type: "send"; type: "send";
@@ -1341,6 +1401,7 @@ export interface WSWatchTriggeredMessage { type: "watch_triggered"; watchId: str
export type WSClientMessage = export type WSClientMessage =
| WSHelloMessage | WSHelloMessage
| WSSessionHelloMessage
| WSSendMessage | WSSendMessage
| WSSetStatusMessage | WSSetStatusMessage
| WSListPeersMessage | WSListPeersMessage