fix: v0.1.15 — production hardening (7 fixes)
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled

Broker:
- Sweep stale presences (3 missed pings = disconnect, 30s interval)
- Exclude sender from broadcast fan-out + queue drain

CLI:
- Decrypt fallback: try base64 plaintext if crypto_box fails
- Stable session keypair across WS reconnects
- Peer name cache (30s TTL) instead of list_peers per push
- Clean up orphaned tmpdirs from crashed sessions (>1 hour old)
- Read displayName from config file (not just env var)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-06 12:22:04 +01:00
parent a987e9e27b
commit 2557235c68
6 changed files with 94 additions and 19 deletions

View File

@@ -265,6 +265,23 @@ export async function refreshQueueDepth(): Promise<void> {
metrics.queueDepth.set(Number(row?.n ?? 0));
}
/**
* Sweep stale presences: mark as disconnected if last_ping_at is older
* than 90s (3 missed pings at the 30s interval = dead session).
*/
export async function sweepStalePresences(): Promise<void> {
const cutoff = new Date(Date.now() - 90_000); // 3 missed pings
await db
.update(presence)
.set({ disconnectedAt: new Date() })
.where(
and(
isNull(presence.disconnectedAt),
lt(presence.lastPingAt, cutoff),
),
);
}
/** Sweep expired pending_status entries. */
export async function sweepPendingStatuses(): Promise<void> {
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
@@ -475,6 +492,7 @@ export async function drainForMember(
memberPubkey: string,
status: PeerStatus,
sessionPubkey?: string,
excludeSenderMemberId?: string,
): Promise<
Array<{
id: string;
@@ -516,6 +534,7 @@ export async function drainForMember(
AND delivered_at IS NULL
AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``})
${excludeSenderMemberId ? sql`AND sender_member_id != ${excludeSenderMemberId}` : sql``}
ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED
)
@@ -553,6 +572,7 @@ export async function drainForMember(
let ttlTimer: ReturnType<typeof setInterval> | null = null;
let pendingTimer: ReturnType<typeof setInterval> | null = null;
let staleTimer: ReturnType<typeof setInterval> | null = null;
/** Start background sweepers. Idempotent. */
export function startSweepers(): void {
@@ -565,14 +585,21 @@ export function startSweepers(): void {
console.error("[broker] pending sweep:", e),
);
}, PENDING_SWEEP_INTERVAL_MS);
staleTimer = setInterval(() => {
sweepStalePresences().catch((e) =>
console.error("[broker] stale presence sweep:", e),
);
}, 30_000);
}
/** Stop background sweepers and mark all active presences disconnected. */
export async function stopSweepers(): Promise<void> {
if (ttlTimer) clearInterval(ttlTimer);
if (pendingTimer) clearInterval(pendingTimer);
if (staleTimer) clearInterval(staleTimer);
ttlTimer = null;
pendingTimer = null;
staleTimer = null;
await db
.update(presence)
.set({ disconnectedAt: new Date() })

View File

@@ -81,7 +81,10 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
}
}
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
async function maybePushQueuedMessages(
presenceId: string,
excludeSenderMemberId?: string,
): Promise<void> {
const conn = connections.get(presenceId);
if (!conn) return;
const status = await refreshStatusFromJsonl(
@@ -95,6 +98,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
conn.memberPubkey,
status,
conn.sessionPubkey ?? undefined,
excludeSenderMemberId,
);
for (const m of messages) {
const push: WSPushMessage = {
@@ -452,14 +456,21 @@ async function handleSend(
};
conn.ws.send(JSON.stringify(ack));
// Fan-out over connected peers in the same mesh.
// Find sender's presenceId to exclude from fan-out.
let senderPresenceId: string | undefined;
for (const [pid, peer] of connections) {
if (peer.ws === conn.ws) { senderPresenceId = pid; break; }
}
// Fan-out over connected peers in the same mesh — skip sender.
for (const [pid, peer] of connections) {
if (pid === senderPresenceId) continue;
if (peer.meshId !== conn.meshId) continue;
if (msg.targetSpec !== "*"
&& peer.memberPubkey !== msg.targetSpec
&& peer.sessionPubkey !== msg.targetSpec)
continue;
void maybePushQueuedMessages(pid);
void maybePushQueuedMessages(pid, conn.memberId);
}
}