fix(broker): balance per-mesh connection counter — stop capacity-leak that bricks meshes
Some checks are pending
CI / Lint (push) Waiting to run
CI / Typecheck (push) Waiting to run
CI / Broker tests (Postgres) (push) Waiting to run
CI / Docker build (linux/amd64) (push) Waiting to run

`connectionsPerMesh` (the in-memory counter enforcing MAX_CONNECTIONS_PER_MESH=100)
was incremented on every successful hello (incMeshCount at member + session paths)
but decremented ONLY inside evictPresenceFully. Every other removal path —
session-id dedup on reconnect (the common one), kick, and ban — deleted the entry
from `connections` without decrementing, leaking +1 each time. Because the counter
is in-memory and only resets on broker restart, it crept up to 100 over hours/days
of normal reconnect churn (network blips, sleep/wake, relaunches) until the mesh
hit capacity and rejected ALL new connections with `1008 "capacity"` — bricking it
until the broker process was restarted. A user with <10 sessions saw "mesh at
connection capacity" because the 100 were leaked phantoms, not live connections.

Fix: route every non-evict removal through a new dropConnection() helper that
deletes from `connections` AND decMeshCount()s, so the counter tracks map
membership exactly. The replaced socket's own close handler then no-ops (entry
already gone, guarded by `if (!conn) return`), so the decrement happens exactly
once — no double-count. evictPresenceFully keeps its existing balanced delete+dec.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-06-14 14:32:33 +01:00
parent 213a6b37cc
commit a9b735a183

View File

@@ -1635,6 +1635,30 @@ function decMeshCount(meshId: string): void {
metrics.connectionsActive.set(connections.size);
}
/**
* Remove a connection from the in-memory map AND keep the per-mesh
* connection counter balanced.
*
* THE counter (`connectionsPerMesh`) is incremented once per successful
* hello (incMeshCount). Before this helper, it was decremented ONLY in
* evictPresenceFully — so every other removal path (session-id dedup on
* reconnect, kick, ban) deleted the entry from `connections` without
* decrementing, leaking +1 each time. Over many reconnects the counter
* crept up to MAX_CONNECTIONS_PER_MESH and the mesh got bricked with
* `capacity` rejections until the broker process restarted (the counter
* is in-memory). Routing every non-evict removal through here makes the
* counter track `connections` membership exactly. The replaced socket's
* own close handler then no-ops (entry already gone), so the decrement
* happens exactly once. Returns true if an entry was actually removed.
*/
function dropConnection(presenceId: string): boolean {
const c = connections.get(presenceId);
if (!c) return false; // already removed (e.g. evicted) — don't double-decrement
connections.delete(presenceId);
decMeshCount(c.meshId);
return true;
}
function sendError(
ws: WebSocket,
code: string,
@@ -1954,7 +1978,7 @@ async function handleHello(
if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) {
log.info("hello dedup", { old_presence: oldPid, session_id: hello.sessionId });
try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ }
connections.delete(oldPid);
dropConnection(oldPid);
void disconnectPresence(oldPid);
}
}
@@ -2241,7 +2265,7 @@ async function handleSessionHello(
if (oldConn.meshId === hello.meshId && oldConn.sessionId === hello.sessionId) {
log.info("session_hello dedup", { old_presence: oldPid, session_id: hello.sessionId });
try { oldConn.ws.close(1000, "session_replaced"); } catch { /* already dead */ }
connections.delete(oldPid);
dropConnection(oldPid);
void disconnectPresence(oldPid);
}
}
@@ -4932,7 +4956,7 @@ function handleConnection(ws: WebSocket): void {
continue;
}
try { peer.ws.close(closeCode, closeReason); } catch {}
connections.delete(pid);
dropConnection(pid);
void disconnectPresence(pid);
affected.push(peer.displayName || pid);
}
@@ -4947,7 +4971,7 @@ function handleConnection(ws: WebSocket): void {
continue;
}
try { peer.ws.close(closeCode, `${closeReason}_stale`); } catch {}
connections.delete(pid);
dropConnection(pid);
void disconnectPresence(pid);
affected.push(peer.displayName || pid);
}
@@ -4961,7 +4985,7 @@ function handleConnection(ws: WebSocket): void {
continue;
}
try { peer.ws.close(closeCode, closeReason); } catch {}
connections.delete(pid);
dropConnection(pid);
void disconnectPresence(pid);
affected.push(peer.displayName || pid);
}
@@ -5017,7 +5041,7 @@ function handleConnection(ws: WebSocket): void {
for (const [pid, peer] of connections) {
if (peer.meshId === conn.meshId && peer.memberPubkey === targetMember.peerPubkey) {
try { peer.ws.close(4002, "banned"); } catch {}
connections.delete(pid);
dropConnection(pid);
void disconnectPresence(pid);
}
}