fix(cli): 1.31.1 — reaper no longer blocks the daemon event loop
1.31.0 introduced a session reaper that called execFileSync(ps) once per registered session every 5s. With many sessions registered, the daemon's event loop stalled for hundreds of ms — long enough that incoming /v1/version probes from the CLI timed out against a healthy daemon and the new service-managed warning fired. Fix: - getProcessStartTime is now async (execFile + promisify); never blocks the event loop - New getProcessStartTimes(pids) issues one batched ps for all survivors instead of N separate forks. Sweep cost is fixed regardless of session count. - registerSession stays sync; start-time capture is fire-and-forget - reapDead is now async; the setInterval wrapper voids it so a rejected sweep cannot crash the daemon Behavior is otherwise unchanged from 1.31.0: same 5s cadence, same PID-reuse guard semantics, same broker-WS teardown via the registry hook. 83/83 tests still green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -22,7 +22,7 @@
|
||||
* session have no token to begin with.
|
||||
*/
|
||||
|
||||
import { getProcessStartTime, isPidAlive } from "./process-info.js";
|
||||
import { getProcessStartTime, getProcessStartTimes, isPidAlive } from "./process-info.js";
|
||||
|
||||
/**
|
||||
* Optional per-launch presence material. Carried opaquely through the
|
||||
@@ -85,7 +85,10 @@ let reaperHandle: NodeJS.Timeout | null = null;
|
||||
|
||||
export function startReaper(): void {
|
||||
if (reaperHandle) return;
|
||||
reaperHandle = setInterval(reapDead, REAPER_INTERVAL_MS).unref?.() ?? reaperHandle;
|
||||
// The sweep is async (batched ps) — wrap in `void` so setInterval
|
||||
// doesn't try to await us, and so an unexpected throw doesn't crash
|
||||
// the daemon. Errors are swallowed inside reapDead.
|
||||
reaperHandle = setInterval(() => { void reapDead(); }, REAPER_INTERVAL_MS).unref?.() ?? reaperHandle;
|
||||
}
|
||||
|
||||
export function stopReaper(): void {
|
||||
@@ -112,18 +115,31 @@ export function registerSession(info: Omit<SessionInfo, "registeredAt">): Sessio
|
||||
}
|
||||
}
|
||||
|
||||
// Capture start-time at register so the reaper can detect PID reuse.
|
||||
// Caller may pre-fill info.startTime (tests do this); only probe ps
|
||||
// when the field is absent so we don't fork shell subprocesses in
|
||||
// unit tests for fake pids.
|
||||
const startTime = info.startTime ?? getProcessStartTime(info.pid) ?? undefined;
|
||||
const stored: SessionInfo = { ...info, startTime, registeredAt: Date.now() };
|
||||
// Caller may pre-fill info.startTime (tests do this for determinism).
|
||||
// For the real path we fire-and-forget an async ps probe — register
|
||||
// stays sync and microsecond-fast, and the start-time lands on the
|
||||
// entry within a few ms. Until it lands, the reaper falls back to
|
||||
// bare liveness for this entry, which is fine for the common case
|
||||
// (PID reuse is rare; the brief window without the guard is
|
||||
// tolerable).
|
||||
const stored: SessionInfo = { ...info, registeredAt: Date.now() };
|
||||
byToken.set(info.token, stored);
|
||||
bySessionId.set(info.sessionId, info.token);
|
||||
try { hooks.onRegister?.(stored); } catch { /* see above */ }
|
||||
if (stored.startTime === undefined) {
|
||||
void captureStartTimeAsync(info.token, info.pid);
|
||||
}
|
||||
return stored;
|
||||
}
|
||||
|
||||
async function captureStartTimeAsync(token: string, pid: number): Promise<void> {
|
||||
const lstart = await getProcessStartTime(pid);
|
||||
if (lstart === null) return;
|
||||
const entry = byToken.get(token);
|
||||
if (!entry || entry.pid !== pid) return; // entry was replaced; skip
|
||||
entry.startTime = lstart;
|
||||
}
|
||||
|
||||
export function deregisterByToken(token: string): boolean {
|
||||
const entry = byToken.get(token);
|
||||
if (!entry) return false;
|
||||
@@ -147,26 +163,52 @@ export function listSessions(): SessionInfo[] {
|
||||
return [...byToken.values()];
|
||||
}
|
||||
|
||||
function reapDead(): void {
|
||||
async function reapDead(): Promise<void> {
|
||||
// Snapshot first; the second (async) phase calls ps and we must not
|
||||
// mutate the registry mid-iteration.
|
||||
const entries = [...byToken.entries()];
|
||||
|
||||
// Phase 1 — TTL + bare liveness. Sync, microsecond-fast.
|
||||
const dead: string[] = [];
|
||||
for (const [token, info] of byToken.entries()) {
|
||||
const survivors: Array<[string, SessionInfo]> = [];
|
||||
for (const [token, info] of entries) {
|
||||
if (Date.now() - info.registeredAt > TTL_MS) { dead.push(token); continue; }
|
||||
if (!isPidAlive(info.pid)) { dead.push(token); continue; }
|
||||
// PID reuse guard: process is alive, but if its start-time changed
|
||||
// since register the original is gone and the OS recycled the pid
|
||||
// for an unrelated program. Skip when we never captured a start-
|
||||
// time (best-effort fallback to bare liveness above).
|
||||
if (info.startTime !== undefined) {
|
||||
const live = getProcessStartTime(info.pid);
|
||||
if (live !== null && live !== info.startTime) { dead.push(token); continue; }
|
||||
survivors.push([token, info]);
|
||||
}
|
||||
|
||||
// Phase 2 — PID-reuse guard for survivors that have a captured
|
||||
// start-time. Single batched ps call: O(1) forks regardless of
|
||||
// session count. Survivors without a start-time keep the bare-
|
||||
// liveness verdict from phase 1 (their captureStartTimeAsync may
|
||||
// still be in-flight from a recent register).
|
||||
const guardedPids = survivors
|
||||
.filter(([, info]) => info.startTime !== undefined)
|
||||
.map(([, info]) => info.pid);
|
||||
if (guardedPids.length > 0) {
|
||||
try {
|
||||
const live = await getProcessStartTimes(guardedPids);
|
||||
for (const [token, info] of survivors) {
|
||||
if (info.startTime === undefined) continue;
|
||||
const lstart = live.get(info.pid);
|
||||
// ps may transiently miss a pid that was alive when isPidAlive
|
||||
// ran — treat absence as "racing", let the next sweep decide.
|
||||
if (lstart === undefined) continue;
|
||||
if (lstart !== info.startTime) dead.push(token);
|
||||
}
|
||||
} catch {
|
||||
// ps failure here is non-fatal: survivors keep their phase-1
|
||||
// verdict. Logging is the daemon's responsibility — the
|
||||
// registry deliberately stays log-free.
|
||||
}
|
||||
}
|
||||
|
||||
for (const t of dead) deregisterByToken(t);
|
||||
}
|
||||
|
||||
/** Test helper: run a single reaper pass synchronously. */
|
||||
export function _runReaperOnce(): void {
|
||||
reapDead();
|
||||
/** Test helper: run a single reaper pass. */
|
||||
export async function _runReaperOnce(): Promise<void> {
|
||||
await reapDead();
|
||||
}
|
||||
|
||||
/** Test helper. */
|
||||
|
||||
Reference in New Issue
Block a user