From a9b735a183605008b3dd7d133b5206e10c4a746b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sun, 14 Jun 2026 14:32:33 +0100 Subject: [PATCH] =?UTF-8?q?fix(broker):=20balance=20per-mesh=20connection?= =?UTF-8?q?=20counter=20=E2=80=94=20stop=20capacity-leak=20that=20bricks?= =?UTF-8?q?=20meshes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `connectionsPerMesh` (the in-memory counter enforcing MAX_CONNECTIONS_PER_MESH=100) was incremented on every successful hello (incMeshCount at member + session paths) but decremented ONLY inside evictPresenceFully. Every other removal path — session-id dedup on reconnect (the common one), kick, and ban — deleted the entry from `connections` without decrementing, leaking +1 each time. Because the counter is in-memory and only resets on broker restart, it crept up to 100 over hours/days of normal reconnect churn (network blips, sleep/wake, relaunches) until the mesh hit capacity and rejected ALL new connections with `1008 "capacity"` — bricking it until the broker process was restarted. A user with <10 sessions saw "mesh at connection capacity" because the 100 were leaked phantoms, not live connections. Fix: route every non-evict removal through a new dropConnection() helper that deletes from `connections` AND decMeshCount()s, so the counter tracks map membership exactly. The replaced socket's own close handler then no-ops (entry already gone, guarded by `if (!conn) return`), so the decrement happens exactly once — no double-count. evictPresenceFully keeps its existing balanced delete+dec. Co-Authored-By: Claude Opus 4.8 (1M context) --- apps/broker/src/index.ts | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index b697e71..67c91cb 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -1635,6 +1635,30 @@ function decMeshCount(meshId: string): void { metrics.connectionsActive.set(connections.size); } +/** + * Remove a connection from the in-memory map AND keep the per-mesh + * connection counter balanced. + * + * THE counter (`connectionsPerMesh`) is incremented once per successful + * hello (incMeshCount). Before this helper, it was decremented ONLY in + * evictPresenceFully — so every other removal path (session-id dedup on + * reconnect, kick, ban) deleted the entry from `connections` without + * decrementing, leaking +1 each time. Over many reconnects the counter + * crept up to MAX_CONNECTIONS_PER_MESH and the mesh got bricked with + * `capacity` rejections until the broker process restarted (the counter + * is in-memory). Routing every non-evict removal through here makes the + * counter track `connections` membership exactly. The replaced socket's + * own close handler then no-ops (entry already gone), so the decrement + * happens exactly once. Returns true if an entry was actually removed. + */ +function dropConnection(presenceId: string): boolean { + const c = connections.get(presenceId); + if (!c) return false; // already removed (e.g. evicted) — don't double-decrement + connections.delete(presenceId); + decMeshCount(c.meshId); + return true; +} + function sendError( ws: WebSocket, code: string, @@ -1954,7 +1978,7 @@ async function handleHello( if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) { log.info("hello dedup", { old_presence: oldPid, session_id: hello.sessionId }); try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ } - connections.delete(oldPid); + dropConnection(oldPid); void disconnectPresence(oldPid); } } @@ -2241,7 +2265,7 @@ async function handleSessionHello( if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) { log.info("session_hello dedup", { old_presence: oldPid, session_id: hello.sessionId }); try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ } - connections.delete(oldPid); + dropConnection(oldPid); void disconnectPresence(oldPid); } } @@ -4932,7 +4956,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, closeReason); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -4947,7 +4971,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, `${closeReason}_stale`); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -4961,7 +4985,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, closeReason); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -5017,7 +5041,7 @@ function handleConnection(ws: WebSocket): void { for (const [pid, peer] of connections) { if (peer.meshId === conn.meshId && peer.memberPubkey === targetMember.peerPubkey) { try { peer.ws.close(4002, "banned"); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); } }