From 9d1b4f3d4caec8be170f17677c4b3dbfd1c2b540 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Tue, 5 May 2026 11:31:55 +0100 Subject: [PATCH] =?UTF-8?q?feat(broker):=20lease=20model=20=E2=80=94=2090s?= =?UTF-8?q?=20grace=20window=20across=20WS=20reconnects?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Continuous presence: peers no longer see peer_left/peer_joined for transient WS reconnects. After a WS close, the connection enters a 90s grace window in offline-leased state. If the same session reconnects (matched by sessionPubkey, or sessionId+memberPubkey for member-WS) within grace, it silently swaps the WS reference, restores online state, drains queued DMs, and resets the DB row. No peer ever sees the session leave. Mechanics: - PeerConn gains leaseState ("online"|"offline"), leaseUntil, evictionTimer - ws.on("close") starts grace instead of immediate cleanup; old socket close after a reattach is detected (conn.ws !== ws) and ignored, since the lease is already healthy on the new socket - handleHello / handleSessionHello check for offline-leased entry matching the stable identity BEFORE running session-id dedup; reattach swaps ws, resets state, returns silent: true - The hello dispatcher skips peer_joined broadcast when result.silent - evictPresenceFully extracted from the close handler — runs the peer_left broadcast + cleanup (URL watches, streams, MCP registry, clock auto-pause). Called by evictionTimer after 90s, or directly if lease wasn't online (defensive) - Stale-pong watchdog skips offline-leased entries (their WS is intentionally dead during grace) - broker.ts exports restorePresence(presenceId) — clears disconnectedAt + bumps lastPingAt, called on reattach to undo any damage the DB-level stale-presence sweeper may have done during grace DMs sent to a session in grace fall through to today's existing queueing path (sendToPeer no-ops on dead WS, the message_queue row sits with deliveredAt=NULL, drained on reattach via the existing maybePushQueuedMessages call). No protocol change. No DB schema change. Backward compatible — old daemons against this broker get silent reconnects within 90s, full peer_joined cycle beyond. Layer 2 of the continuous-presence work; spec at .artifacts/specs/2026-05-05-continuous-presence.md. Layer 3 (daemon-side resume token storage + send) is optional polish, not needed for the user-visible behavior. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/broker/src/broker.ts | 15 ++ apps/broker/src/index.ts | 352 +++++++++++++++++++++++++++++--------- 2 files changed, 289 insertions(+), 78 deletions(-) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index e7fcc8e..ae2a3c5 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -427,6 +427,21 @@ export async function heartbeat(presenceId: string): Promise { .where(eq(presence.id, presenceId)); } +/** + * Restore a presence row to online state on lease reattach: clear + * `disconnectedAt` and bump `lastPingAt`. Needed because the DB-level + * stale-presence sweeper may have flipped the row to disconnected + * during the grace window — the lease is in-memory truth, but other + * code paths read presence.disconnectedAt directly. + */ +export async function restorePresence(presenceId: string): Promise { + const now = new Date(); + await db + .update(presence) + .set({ disconnectedAt: null, lastPingAt: now }) + .where(eq(presence.id, presenceId)); +} + // --- Peer discovery --- /** Return all active (connected) presences in a mesh, joined with member info. */ diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index bc1108b..8c6f4eb 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -41,6 +41,7 @@ import { grantFileKey, handleHookSetStatus, heartbeat, + restorePresence, insertFileKeys, joinGroup, joinMesh, @@ -166,11 +167,43 @@ interface PeerConn { * half-dead TCP/NAT-dropped connections that the kernel hasn't yet * RST'd (Linux default keepalive ≈ 2hrs). */ lastPongAt: number; + /** Lease state: "online" while the WS is healthy, "offline" during + * the GRACE window after a WS close. While offline, the entry stays + * in `connections` so peer_list / sendToPeer still see it; DMs land + * in the message_queue (sendToPeer no-ops on dead WS, but the queue + * row stays with deliveredAt=NULL and drains on reattach). After + * GRACE_MS without a reattach, evictionTimer fires the full peer_left + * + cleanup. Reattach (same sessionPubkey hello arriving on a fresh + * WS) cancels the timer, swaps in the new ws, restores online. */ + leaseState: "online" | "offline"; + /** When the lease will be evicted if no reattach happens. 0 when online. */ + leaseUntil: number; + /** Timer that fires evictPresenceFully(presenceId) at leaseUntil. null when online. */ + evictionTimer: NodeJS.Timeout | null; } const connections = new Map(); const connectionsPerMesh = new Map(); +/** + * Lease grace window — how long after a WS close the broker will hold + * the presence row open before evicting and broadcasting peer_left. + * + * 90s: long enough to absorb a sleep/resume cycle, NAT timeout, ISP + * route flap, mobile→wifi handover, broker restart of the daemon's + * machine. Short enough that a true crash (machine off, daemon killed) + * clears the session within 90s — peers don't see ghost online status + * forever. + * + * During grace: lease stays in `connections`, peer_list keeps showing + * the session as online to other peers, DMs route through message_queue + * (sendToPeer no-ops on dead WS, drain happens on reattach). On + * reattach (same sessionPubkey hello on a new WS): silent swap, no + * peer_joined / peer_left visible to anyone. After grace expires: + * full eviction (peer_left + cleanup) fires exactly once. + */ +const GRACE_MS = 90_000; + // Rate limiter for /tg/token endpoint (IP → count, cleared hourly) const tgTokenRateLimit = new Map(); setInterval(() => tgTokenRateLimit.clear(), 60 * 60_000).unref(); @@ -535,6 +568,97 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void { } } +/** + * Run the full presence-cleanup path: broadcast peer_left, decMeshCount, + * disconnectPresence in DB, audit, clean up URL watches / streams / + * MCP entries / clock. Removes the entry from `connections`. + * + * Called from two places: + * 1. `ws.on("close")` when the closing WS belongs to a connection + * with no active lease (no grace) — i.e. the lease had already + * been evicted, or the close fires before lease is established. + * 2. The grace-window evictionTimer when no reattach happened in + * GRACE_MS. This is the "presence is really gone" path. + * + * Idempotent: re-entering when the connections entry is already gone + * is a no-op. + */ +async function evictPresenceFully(presenceId: string): Promise { + const conn = connections.get(presenceId); + if (!conn) return; // already evicted + if (conn.evictionTimer) { + clearTimeout(conn.evictionTimer); + conn.evictionTimer = null; + } + connections.delete(presenceId); + decMeshCount(conn.meshId); + + const leaveMsg: WSPushMessage = { + type: "push", + subtype: "system", + event: "peer_left", + eventData: { + name: conn.displayName, + pubkey: conn.sessionPubkey ?? conn.memberPubkey, + }, + messageId: crypto.randomUUID(), + meshId: conn.meshId, + senderPubkey: "system", + priority: "low", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + // Don't tell the user's own other sessions they "left" when one + // of their Claude Code instances closes. Same pubkey = same user. + if (peer.memberPubkey === conn.memberPubkey) continue; + sendToPeer(pid, leaveMsg); + } + + await disconnectPresence(presenceId); + void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {}); + + // URL watches owned by this presence — interval would otherwise + // happily fetch forever after the peer is gone. + for (const [watchId, watch] of urlWatches) { + if (watch.presenceId === presenceId) { + clearInterval(watch.timer); + urlWatches.delete(watchId); + } + } + // Stream subscriptions for this presence. + for (const [key, subs] of streamSubscriptions) { + subs.delete(presenceId); + if (subs.size === 0) streamSubscriptions.delete(key); + } + // MCP servers registered by this presence. + for (const [key, entry] of mcpRegistry) { + if (entry.presenceId === presenceId) { + if (entry.persistent) { + // Keep persistent entries but mark offline + entry.online = false; + entry.offlineSince = new Date().toISOString(); + entry.presenceId = ""; + } else { + mcpRegistry.delete(key); + } + } + } + // Auto-pause clock when mesh becomes empty. + if (!connectionsPerMesh.has(conn.meshId)) { + const clock = meshClocks.get(conn.meshId); + if (clock && clock.timer) { + clearInterval(clock.timer); + clock.timer = null; + clock.paused = true; + log.info("clock auto-paused (mesh empty)", { mesh_id: conn.meshId }); + } + } + log.info("ws evict full", { presence_id: presenceId }); +} + async function maybePushQueuedMessages( presenceId: string, excludeSenderSessionPubkey?: string, @@ -1671,6 +1795,10 @@ async function handleHello( lastSeenAt?: string; restoredGroups?: Array<{ name: string; role?: string }>; restoredStats?: unknown; + /** True when this hello reattached an existing offline lease — caller + * must skip the peer_joined broadcast and the services-list ack + * augmentation. The session was never visibly absent from peers. */ + silent?: boolean; } | null> { // Validate sessionPubkey shape — it becomes a routable identity in // listPeers/drainForMember, so arbitrary strings let a client claim @@ -1763,6 +1891,61 @@ async function handleHello( const initialGroups = helloHasGroups ? hello.groups! : (saved?.groups?.length ? saved.groups : (member.defaultGroups ?? [])); + // Reattach check: if an offline-leased lease exists for the same + // stable identity (sessionPubkey when present, otherwise sessionId + // for member-WS), this hello is a transient reconnect within the + // grace window — swap the WS reference, clear the eviction timer, + // restore online state. No peer_joined broadcast — peers never saw + // this session leave. + for (const [pid, oldConn] of connections) { + if (oldConn.meshId !== hello.meshId) continue; + if (oldConn.leaseState !== "offline") continue; + const matchByPubkey = + !!hello.sessionPubkey + && oldConn.sessionPubkey === hello.sessionPubkey; + const matchBySessionId = + !hello.sessionPubkey + && !oldConn.sessionPubkey + && oldConn.sessionId === hello.sessionId + && oldConn.memberPubkey === hello.pubkey; + if (!matchByPubkey && !matchBySessionId) continue; + + if (oldConn.evictionTimer) { + clearTimeout(oldConn.evictionTimer); + oldConn.evictionTimer = null; + } + oldConn.ws = ws; + oldConn.leaseState = "online"; + oldConn.leaseUntil = 0; + oldConn.lastPongAt = Date.now(); + // Refresh mutable fields from the new hello — the same session may + // have moved cwd / changed display name across the blip. + oldConn.cwd = hello.cwd; + if (hello.displayName) oldConn.displayName = hello.displayName; + log.info("ws hello reattach (lease)", { + presence_id: pid, + session_pubkey: hello.sessionPubkey?.slice(0, 12) ?? "(member-WS)", + session_id: hello.sessionId, + }); + // Reset DB row to online: the stale-presence sweeper may have set + // disconnectedAt during the grace window. Lease is in-memory truth + // but downstream code paths read presence.disconnectedAt directly. + void restorePresence(pid); + // Drain any queued DMs that landed during the offline window. + void maybePushQueuedMessages(pid); + return { + presenceId: pid, + memberDisplayName: oldConn.displayName, + memberProfile: { + roleTag: member.roleTag, + groups: member.defaultGroups ?? [], + messageMode: member.messageMode ?? "push", + }, + meshPolicy, + silent: true, + }; + } + // Session-id dedup: if this session_id already has an active presence, // disconnect the ghost. Happens when a client reconnects after a // network blip or broker restart before the 90s stale sweeper runs. @@ -1809,6 +1992,9 @@ async function handleHello( profile: saved?.profile ?? {}, peerRole: "control-plane", lastPongAt: Date.now(), + leaseState: "online", + leaseUntil: 0, + evictionTimer: null, }); incMeshCount(hello.meshId); void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, { @@ -1865,6 +2051,10 @@ async function handleSessionHello( memberDisplayName: string; memberProfile?: unknown; meshPolicy?: Record; + /** True when this hello reattached an existing offline lease — caller + * must skip the peer_joined broadcast. The session was never visibly + * absent from peers. */ + silent?: boolean; } | null> { // Shape checks. The crypto helpers also enforce these but bailing // early gives a clearer error code on the wire. @@ -1994,6 +2184,42 @@ async function handleSessionHello( const initialGroups = hello.groups ?? member.defaultGroups ?? []; + // Reattach check: an offline-leased connection with the same + // sessionPubkey is the same launched session resuming inside the + // grace window. Cancel the eviction timer, swap the WS, restore + // online state. No peer_joined broadcast — peers never saw the + // session leave. + for (const [pid, oldConn] of connections) { + if (oldConn.meshId !== hello.meshId) continue; + if (oldConn.leaseState !== "offline") continue; + if (oldConn.sessionPubkey !== hello.sessionPubkey) continue; + + if (oldConn.evictionTimer) { + clearTimeout(oldConn.evictionTimer); + oldConn.evictionTimer = null; + } + oldConn.ws = ws; + oldConn.leaseState = "online"; + oldConn.leaseUntil = 0; + oldConn.lastPongAt = Date.now(); + // Refresh mutable fields from the new hello. + oldConn.cwd = hello.cwd; + if (hello.displayName) oldConn.displayName = hello.displayName; + log.info("session_hello reattach (lease)", { + presence_id: pid, + session_pubkey: hello.sessionPubkey.slice(0, 12), + }); + void restorePresence(pid); + void maybePushQueuedMessages(pid); + return { + presenceId: pid, + memberDisplayName: oldConn.displayName, + memberProfile: undefined, + meshPolicy, + silent: true, + }; + } + // Session-id dedup: if the same session_id is already connected, kick // the ghost. Reconnect after a network blip lands here cleanly. for (const [oldPid, oldConn] of connections) { @@ -2036,6 +2262,9 @@ async function handleSessionHello( profile: {}, peerRole: "session", lastPongAt: Date.now(), + leaseState: "online", + leaseUntil: 0, + evictionTimer: null, }); incMeshCount(hello.meshId); void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, { @@ -2434,8 +2663,10 @@ function handleConnection(ws: WebSocket): void { } // Broadcast peer_joined to siblings — same shape as the regular // hello path, so list_peers consumers don't need to special-case. + // Skipped on lease reattach: the session was never visibly absent, + // so no synthetic join event should fire. const joinedConn = connections.get(presenceId); - if (joinedConn) { + if (joinedConn && !result.silent) { const joinMsg: WSPushMessage = { type: "push", subtype: "system", @@ -2518,9 +2749,11 @@ function handleConnection(ws: WebSocket): void { } catch { /* ws closed during hello */ } - // Broadcast peer_joined or peer_returned to all other peers in the same mesh. + // Broadcast peer_joined or peer_returned to all other peers in the + // same mesh. Skipped on lease reattach: the session never appeared + // offline so no synthetic join event should fire. const joinedConn = connections.get(presenceId); - if (joinedConn) { + if (joinedConn && !result.silent) { const isReturning = !!result.restored; const joinMsg: WSPushMessage = { type: "push", @@ -5161,82 +5394,42 @@ function handleConnection(ws: WebSocket): void { } }); ws.on("close", async () => { - if (presenceId) { - const conn = connections.get(presenceId); - // Persist peer state BEFORE removing from connections. - if (conn) { - await savePeerState(conn, conn.memberId, conn.meshId); - } - connections.delete(presenceId); - if (conn) { - decMeshCount(conn.meshId); - // Broadcast peer_left to remaining peers in the same mesh. - const leaveMsg: WSPushMessage = { - type: "push", - subtype: "system", - event: "peer_left", - eventData: { - name: conn.displayName, - pubkey: conn.sessionPubkey ?? conn.memberPubkey, - }, - messageId: crypto.randomUUID(), - meshId: conn.meshId, - senderPubkey: "system", - priority: "low", - nonce: "", - ciphertext: "", - createdAt: new Date().toISOString(), - }; - for (const [pid, peer] of connections) { - if (peer.meshId !== conn.meshId) continue; - // Don't tell the user's own other sessions they "left" when one - // of their Claude Code instances closes. Same pubkey = same user. - if (peer.memberPubkey === conn.memberPubkey) continue; - sendToPeer(pid, leaveMsg); - } - } - await disconnectPresence(presenceId); - if (conn) { - void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {}); - } - // Clean up URL watches owned by this peer — the interval was - // happily fetching forever after the peer disconnected. - for (const [watchId, watch] of urlWatches) { - if (watch.presenceId === presenceId) { - clearInterval(watch.timer); - urlWatches.delete(watchId); - } - } - // Clean up stream subscriptions for this peer - for (const [key, subs] of streamSubscriptions) { - subs.delete(presenceId); - if (subs.size === 0) streamSubscriptions.delete(key); - } - // Clean up MCP servers registered by this peer - for (const [key, entry] of mcpRegistry) { - if (entry.presenceId === presenceId) { - if (entry.persistent) { - // Keep persistent entries but mark offline - entry.online = false; - entry.offlineSince = new Date().toISOString(); - entry.presenceId = ""; - } else { - mcpRegistry.delete(key); - } - } - } - // Auto-pause clock when mesh becomes empty - if (conn && !connectionsPerMesh.has(conn.meshId)) { - const clock = meshClocks.get(conn.meshId); - if (clock && clock.timer) { - clearInterval(clock.timer); - clock.timer = null; - clock.paused = true; - log.info("clock auto-paused (mesh empty)", { mesh_id: conn.meshId }); - } - } - log.info("ws close", { presence_id: presenceId }); + if (!presenceId) return; + const conn = connections.get(presenceId); + if (!conn) return; // already evicted + + // If the conn's `ws` is no longer THIS ws, the close belongs to an + // older socket that was already replaced by a reattach. Ignore — the + // lease is healthy with the new WS, no eviction needed. + if (conn.ws !== ws) { + log.debug("ws close on replaced socket — ignoring", { presence_id: presenceId }); + return; } + + await savePeerState(conn, conn.memberId, conn.meshId); + + // If lease is currently online, enter grace. Other peers see the + // session as still online; DMs queue (sendToPeer no-ops on dead + // WS, drain on reattach). After GRACE_MS without a reattach, the + // timer fires evictPresenceFully and cleanup runs as before. + const pid = presenceId; + if (conn.leaseState === "online") { + conn.leaseState = "offline"; + conn.leaseUntil = Date.now() + GRACE_MS; + conn.evictionTimer = setTimeout(() => { + log.info("lease grace expired — evicting", { presence_id: pid }); + void evictPresenceFully(pid); + }, GRACE_MS); + log.info("ws close — lease grace started", { + presence_id: pid, + grace_ms: GRACE_MS, + }); + return; + } + + // Not online (already in grace from an earlier close, or odd state). + // Run full eviction immediately. + await evictPresenceFully(pid); }); ws.on("error", (err) => { log.warn("ws error", { error: err.message }); @@ -5447,6 +5640,9 @@ async function main(): Promise { const pingInterval = setInterval(() => { const now = Date.now(); for (const [pid, conn] of connections) { + // Skip offline-leased entries: their WS is intentionally dead + // during grace; the eviction timer handles their lifecycle. + if (conn.leaseState === "offline") continue; const { ws } = conn; if (ws.readyState !== ws.OPEN) continue; if (now - conn.lastPongAt > STALE_PONG_THRESHOLD_MS) {