feat(broker,cli): liveness watchdogs — 75s stale-pong terminate
Both sides now actively detect half-dead WS connections instead of
waiting for kernel TCP keepalive (~2hrs default on Linux). Bug user
reported: "claudemesh peer list" shows zero peers despite running
sessions, because NAT/CGNAT silently dropped the WS flow but neither
side noticed.
Broker (apps/broker/src/index.ts):
- Add lastPongAt to PeerConn, populate at connections.set sites,
bump in ws.on("pong").
- 30s ping loop now also terminates conns whose pong is >75s stale.
ws.terminate() fires the close handler → existing peer_left path.
Daemon (apps/cli/src/daemon/ws-lifecycle.ts):
- Add idle watchdog at 30s cadence, started after hello-ack.
- Bumps lastActivity on incoming message, ping, and pong frames.
- Sends sock.ping() if recent activity, terminates if idle >75s.
- Watchdog cleared on close handler + explicit close().
CLI 1.34.15 → 1.34.16. Broker stays 0.1.0 (deploys from main).
Spec: .artifacts/specs/2026-05-05-continuous-presence.md (full lease
model + resume token, this commit ships only the watchdogs — first
of four progressive layers).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -161,6 +161,11 @@ interface PeerConn {
|
||||
* on long-lived control-plane connections (daemon, dashboard) that
|
||||
* would just auto-reconnect. */
|
||||
peerRole: "control-plane" | "session" | "service";
|
||||
/** Last time this connection's WS replied to a broker ping. Bumped
|
||||
* in the `pong` handler. Used by the staleness watchdog to detect
|
||||
* half-dead TCP/NAT-dropped connections that the kernel hasn't yet
|
||||
* RST'd (Linux default keepalive ≈ 2hrs). */
|
||||
lastPongAt: number;
|
||||
}
|
||||
|
||||
const connections = new Map<string, PeerConn>();
|
||||
@@ -1803,6 +1808,7 @@ async function handleHello(
|
||||
visible: saved?.visible ?? true,
|
||||
profile: saved?.profile ?? {},
|
||||
peerRole: "control-plane",
|
||||
lastPongAt: Date.now(),
|
||||
});
|
||||
incMeshCount(hello.meshId);
|
||||
void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, {
|
||||
@@ -2029,6 +2035,7 @@ async function handleSessionHello(
|
||||
visible: true,
|
||||
profile: {},
|
||||
peerRole: "session",
|
||||
lastPongAt: Date.now(),
|
||||
});
|
||||
incMeshCount(hello.meshId);
|
||||
void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, {
|
||||
@@ -5235,7 +5242,11 @@ function handleConnection(ws: WebSocket): void {
|
||||
log.warn("ws error", { error: err.message });
|
||||
});
|
||||
ws.on("pong", () => {
|
||||
if (presenceId) void heartbeat(presenceId);
|
||||
if (presenceId) {
|
||||
const conn = connections.get(presenceId);
|
||||
if (conn) conn.lastPongAt = Date.now();
|
||||
void heartbeat(presenceId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5427,10 +5438,26 @@ async function main(): Promise<void> {
|
||||
});
|
||||
});
|
||||
|
||||
// WS heartbeat ping every 30s; clients reply with pong → bumps lastPingAt.
|
||||
// WS heartbeat ping every 30s; clients reply with pong → bumps
|
||||
// lastPongAt. Connections whose pong is older than 75s (2.5x the
|
||||
// ping interval) are considered half-dead — kernel hasn't yet RST'd
|
||||
// the socket but no application traffic is flowing. Force-terminate
|
||||
// them to fire the close handler and free the connection slot.
|
||||
const STALE_PONG_THRESHOLD_MS = 75_000;
|
||||
const pingInterval = setInterval(() => {
|
||||
for (const { ws } of connections.values()) {
|
||||
if (ws.readyState === ws.OPEN) ws.ping();
|
||||
const now = Date.now();
|
||||
for (const [pid, conn] of connections) {
|
||||
const { ws } = conn;
|
||||
if (ws.readyState !== ws.OPEN) continue;
|
||||
if (now - conn.lastPongAt > STALE_PONG_THRESHOLD_MS) {
|
||||
log.warn("ws stale terminate", {
|
||||
presence_id: pid,
|
||||
last_pong_ago_ms: now - conn.lastPongAt,
|
||||
});
|
||||
try { ws.terminate(); } catch { /* socket already gone */ }
|
||||
continue;
|
||||
}
|
||||
ws.ping();
|
||||
}
|
||||
}, 30_000);
|
||||
pingInterval.unref();
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "claudemesh-cli",
|
||||
"version": "1.34.15",
|
||||
"version": "1.34.16",
|
||||
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
|
||||
"keywords": [
|
||||
"claude-code",
|
||||
|
||||
@@ -139,6 +139,25 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
* but ignores the rejection — by then the close handler has already
|
||||
* scheduled its own reconnect).
|
||||
*/
|
||||
// Liveness watchdog: same cadence (30s) as the broker's outbound
|
||||
// ping. Two jobs per tick:
|
||||
// 1. If we haven't heard from the broker in >75s (2.5x the ping
|
||||
// cadence — covers one missed ping plus some slack), terminate
|
||||
// the socket. Fires the close handler → backoff reconnect runs
|
||||
// its normal path. This is what catches NAT-dropped half-dead
|
||||
// connections that the kernel won't RST for ~2 hours.
|
||||
// 2. Otherwise, send our own ping. The broker's `ws` library
|
||||
// auto-replies with a pong, which bumps lastActivity. This
|
||||
// keeps the broker's stale-pong watchdog seeing us as alive.
|
||||
//
|
||||
// Bare `ping` and `pong` events both bump lastActivity, as does
|
||||
// any inbound application message — any sign of life resets the
|
||||
// dead-man's-switch.
|
||||
const PING_INTERVAL_MS = 30_000;
|
||||
const STALE_THRESHOLD_MS = 75_000;
|
||||
let lastActivity = Date.now();
|
||||
let watchdogTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
const openOnce = (): Promise<void> => {
|
||||
if (closed) return Promise.reject(new Error("client_closed"));
|
||||
setStatus("connecting");
|
||||
@@ -146,6 +165,7 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
log("info", "ws_open_attempt", { url: opts.url });
|
||||
const sock = new WebSocket(opts.url);
|
||||
ws = sock;
|
||||
lastActivity = Date.now();
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
sock.on("open", () => {
|
||||
@@ -170,6 +190,7 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
});
|
||||
|
||||
sock.on("message", (raw) => {
|
||||
lastActivity = Date.now();
|
||||
let msg: Record<string, unknown>;
|
||||
try { msg = JSON.parse(raw.toString()) as Record<string, unknown>; }
|
||||
catch { return; }
|
||||
@@ -179,6 +200,18 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
setStatus("open");
|
||||
reconnectAttempt = 0;
|
||||
log("info", "ws_hello_acked", { url: opts.url });
|
||||
// Start liveness watchdog only after a successful handshake.
|
||||
if (watchdogTimer) clearInterval(watchdogTimer);
|
||||
watchdogTimer = setInterval(() => {
|
||||
if (sock.readyState !== sock.OPEN) return;
|
||||
const idle = Date.now() - lastActivity;
|
||||
if (idle > STALE_THRESHOLD_MS) {
|
||||
log("warn", "ws_stale_terminate", { url: opts.url, idle_ms: idle });
|
||||
try { sock.terminate(); } catch { /* socket already gone */ }
|
||||
return;
|
||||
}
|
||||
try { sock.ping(); } catch { /* ignore */ }
|
||||
}, PING_INTERVAL_MS);
|
||||
resolve();
|
||||
return;
|
||||
}
|
||||
@@ -186,8 +219,12 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
opts.onMessage(msg);
|
||||
});
|
||||
|
||||
sock.on("ping", () => { lastActivity = Date.now(); });
|
||||
sock.on("pong", () => { lastActivity = Date.now(); });
|
||||
|
||||
sock.on("close", (code, reason) => {
|
||||
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
|
||||
if (watchdogTimer) { clearInterval(watchdogTimer); watchdogTimer = null; }
|
||||
const reasonStr = reason.toString("utf8");
|
||||
log("warn", "ws_closed", { url: opts.url, code, reason: reasonStr, status });
|
||||
opts.onBeforeReconnect?.(code, reasonStr);
|
||||
@@ -227,6 +264,7 @@ export function connectWsWithBackoff(opts: WsLifecycleOptions): Promise<WsLifecy
|
||||
closed = true;
|
||||
if (reconnectTimer) { clearTimeout(reconnectTimer); reconnectTimer = null; }
|
||||
if (helloTimer) { clearTimeout(helloTimer); helloTimer = null; }
|
||||
if (watchdogTimer) { clearInterval(watchdogTimer); watchdogTimer = null; }
|
||||
try { ws?.close(); } catch { /* ignore */ }
|
||||
setStatus("closed");
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user