diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index b697e71..67c91cb 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -1635,6 +1635,30 @@ function decMeshCount(meshId: string): void { metrics.connectionsActive.set(connections.size); } +/** + * Remove a connection from the in-memory map AND keep the per-mesh + * connection counter balanced. + * + * THE counter (`connectionsPerMesh`) is incremented once per successful + * hello (incMeshCount). Before this helper, it was decremented ONLY in + * evictPresenceFully — so every other removal path (session-id dedup on + * reconnect, kick, ban) deleted the entry from `connections` without + * decrementing, leaking +1 each time. Over many reconnects the counter + * crept up to MAX_CONNECTIONS_PER_MESH and the mesh got bricked with + * `capacity` rejections until the broker process restarted (the counter + * is in-memory). Routing every non-evict removal through here makes the + * counter track `connections` membership exactly. The replaced socket's + * own close handler then no-ops (entry already gone), so the decrement + * happens exactly once. Returns true if an entry was actually removed. + */ +function dropConnection(presenceId: string): boolean { + const c = connections.get(presenceId); + if (!c) return false; // already removed (e.g. evicted) — don't double-decrement + connections.delete(presenceId); + decMeshCount(c.meshId); + return true; +} + function sendError( ws: WebSocket, code: string, @@ -1954,7 +1978,7 @@ async function handleHello( if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) { log.info("hello dedup", { old_presence: oldPid, session_id: hello.sessionId }); try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ } - connections.delete(oldPid); + dropConnection(oldPid); void disconnectPresence(oldPid); } } @@ -2241,7 +2265,7 @@ async function handleSessionHello( if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) { log.info("session_hello dedup", { old_presence: oldPid, session_id: hello.sessionId }); try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ } - connections.delete(oldPid); + dropConnection(oldPid); void disconnectPresence(oldPid); } } @@ -4932,7 +4956,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, closeReason); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -4947,7 +4971,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, `${closeReason}_stale`); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -4961,7 +4985,7 @@ function handleConnection(ws: WebSocket): void { continue; } try { peer.ws.close(closeCode, closeReason); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); affected.push(peer.displayName || pid); } @@ -5017,7 +5041,7 @@ function handleConnection(ws: WebSocket): void { for (const [pid, peer] of connections) { if (peer.meshId === conn.meshId && peer.memberPubkey === targetMember.peerPubkey) { try { peer.ws.close(4002, "banned"); } catch {} - connections.delete(pid); + dropConnection(pid); void disconnectPresence(pid); } }