fix: v0.1.15 — production hardening (7 fixes)

Broker:
- Sweep stale presences (3 missed pings = disconnect, 30s interval)
- Exclude sender from broadcast fan-out + queue drain

CLI:
- Decrypt fallback: try base64 plaintext if crypto_box fails
- Stable session keypair across WS reconnects
- Peer name cache (30s TTL) instead of list_peers per push
- Clean up orphaned tmpdirs from crashed sessions (>1 hour old)
- Read displayName from config file (not just env var)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-06 12:22:04 +01:00
parent a987e9e27b
commit bdda63a388
6 changed files with 94 additions and 19 deletions

View File

@@ -265,6 +265,23 @@ export async function refreshQueueDepth(): Promise<void> {
metrics.queueDepth.set(Number(row?.n ?? 0)); metrics.queueDepth.set(Number(row?.n ?? 0));
} }
/**
* Sweep stale presences: mark as disconnected if last_ping_at is older
* than 90s (3 missed pings at the 30s interval = dead session).
*/
export async function sweepStalePresences(): Promise<void> {
const cutoff = new Date(Date.now() - 90_000); // 3 missed pings
await db
.update(presence)
.set({ disconnectedAt: new Date() })
.where(
and(
isNull(presence.disconnectedAt),
lt(presence.lastPingAt, cutoff),
),
);
}
/** Sweep expired pending_status entries. */ /** Sweep expired pending_status entries. */
export async function sweepPendingStatuses(): Promise<void> { export async function sweepPendingStatuses(): Promise<void> {
const cutoff = new Date(Date.now() - PENDING_TTL_MS); const cutoff = new Date(Date.now() - PENDING_TTL_MS);
@@ -475,6 +492,7 @@ export async function drainForMember(
memberPubkey: string, memberPubkey: string,
status: PeerStatus, status: PeerStatus,
sessionPubkey?: string, sessionPubkey?: string,
excludeSenderMemberId?: string,
): Promise< ): Promise<
Array<{ Array<{
id: string; id: string;
@@ -516,6 +534,7 @@ export async function drainForMember(
AND delivered_at IS NULL AND delivered_at IS NULL
AND priority::text IN (${priorityList}) AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``}) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``})
${excludeSenderMemberId ? sql`AND sender_member_id != ${excludeSenderMemberId}` : sql``}
ORDER BY created_at ASC, id ASC ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED FOR UPDATE SKIP LOCKED
) )
@@ -553,6 +572,7 @@ export async function drainForMember(
let ttlTimer: ReturnType<typeof setInterval> | null = null; let ttlTimer: ReturnType<typeof setInterval> | null = null;
let pendingTimer: ReturnType<typeof setInterval> | null = null; let pendingTimer: ReturnType<typeof setInterval> | null = null;
let staleTimer: ReturnType<typeof setInterval> | null = null;
/** Start background sweepers. Idempotent. */ /** Start background sweepers. Idempotent. */
export function startSweepers(): void { export function startSweepers(): void {
@@ -565,14 +585,21 @@ export function startSweepers(): void {
console.error("[broker] pending sweep:", e), console.error("[broker] pending sweep:", e),
); );
}, PENDING_SWEEP_INTERVAL_MS); }, PENDING_SWEEP_INTERVAL_MS);
staleTimer = setInterval(() => {
sweepStalePresences().catch((e) =>
console.error("[broker] stale presence sweep:", e),
);
}, 30_000);
} }
/** Stop background sweepers and mark all active presences disconnected. */ /** Stop background sweepers and mark all active presences disconnected. */
export async function stopSweepers(): Promise<void> { export async function stopSweepers(): Promise<void> {
if (ttlTimer) clearInterval(ttlTimer); if (ttlTimer) clearInterval(ttlTimer);
if (pendingTimer) clearInterval(pendingTimer); if (pendingTimer) clearInterval(pendingTimer);
if (staleTimer) clearInterval(staleTimer);
ttlTimer = null; ttlTimer = null;
pendingTimer = null; pendingTimer = null;
staleTimer = null;
await db await db
.update(presence) .update(presence)
.set({ disconnectedAt: new Date() }) .set({ disconnectedAt: new Date() })

View File

@@ -81,7 +81,10 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
} }
} }
async function maybePushQueuedMessages(presenceId: string): Promise<void> { async function maybePushQueuedMessages(
presenceId: string,
excludeSenderMemberId?: string,
): Promise<void> {
const conn = connections.get(presenceId); const conn = connections.get(presenceId);
if (!conn) return; if (!conn) return;
const status = await refreshStatusFromJsonl( const status = await refreshStatusFromJsonl(
@@ -95,6 +98,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
conn.memberPubkey, conn.memberPubkey,
status, status,
conn.sessionPubkey ?? undefined, conn.sessionPubkey ?? undefined,
excludeSenderMemberId,
); );
for (const m of messages) { for (const m of messages) {
const push: WSPushMessage = { const push: WSPushMessage = {
@@ -452,14 +456,21 @@ async function handleSend(
}; };
conn.ws.send(JSON.stringify(ack)); conn.ws.send(JSON.stringify(ack));
// Fan-out over connected peers in the same mesh. // Find sender's presenceId to exclude from fan-out.
let senderPresenceId: string | undefined;
for (const [pid, peer] of connections) { for (const [pid, peer] of connections) {
if (peer.ws === conn.ws) { senderPresenceId = pid; break; }
}
// Fan-out over connected peers in the same mesh — skip sender.
for (const [pid, peer] of connections) {
if (pid === senderPresenceId) continue;
if (peer.meshId !== conn.meshId) continue; if (peer.meshId !== conn.meshId) continue;
if (msg.targetSpec !== "*" if (msg.targetSpec !== "*"
&& peer.memberPubkey !== msg.targetSpec && peer.memberPubkey !== msg.targetSpec
&& peer.sessionPubkey !== msg.targetSpec) && peer.sessionPubkey !== msg.targetSpec)
continue; continue;
void maybePushQueuedMessages(pid); void maybePushQueuedMessages(pid, conn.memberId);
} }
} }

View File

@@ -1,6 +1,6 @@
{ {
"name": "claudemesh-cli", "name": "claudemesh-cli",
"version": "0.1.14", "version": "0.1.15",
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.", "description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
"keywords": [ "keywords": [
"claude-code", "claude-code",

View File

@@ -11,7 +11,7 @@
*/ */
import { spawn } from "node:child_process"; import { spawn } from "node:child_process";
import { mkdtempSync, writeFileSync, rmSync } from "node:fs"; import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync } from "node:fs";
import { tmpdir, hostname } from "node:os"; import { tmpdir, hostname } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { createInterface } from "node:readline"; import { createInterface } from "node:readline";
@@ -215,6 +215,17 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
// We just set the display name via env var. // We just set the display name via env var.
const displayName = args.name ?? `${hostname()}-${process.pid}`; const displayName = args.name ?? `${hostname()}-${process.pid}`;
// Clean up orphaned tmpdirs from crashed sessions (older than 1 hour)
const tmpBase = tmpdir();
try {
for (const entry of readdirSync(tmpBase)) {
if (!entry.startsWith("claudemesh-")) continue;
const full = join(tmpBase, entry);
const age = Date.now() - statSync(full).mtimeMs;
if (age > 3600_000) rmSync(full, { recursive: true, force: true });
}
} catch { /* best effort */ }
// 4. Write session config to tmpdir (isolates mesh selection). // 4. Write session config to tmpdir (isolates mesh selection).
const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-")); const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-"));
const sessionConfig: Config = { const sessionConfig: Config = {

View File

@@ -98,6 +98,24 @@ async function resolveClient(to: string): Promise<{
}; };
} }
// Peer name cache to avoid calling listPeers on every incoming push
const peerNameCache = new Map<string, string>();
let peerNameCacheAge = 0;
const CACHE_TTL_MS = 30_000;
async function resolvePeerName(client: BrokerClient, pubkey: string): Promise<string> {
const now = Date.now();
if (now - peerNameCacheAge > CACHE_TTL_MS) {
peerNameCache.clear();
try {
const peers = await client.listPeers();
for (const p of peers) peerNameCache.set(p.pubkey, p.displayName);
} catch { /* best effort */ }
peerNameCacheAge = now;
}
return peerNameCache.get(pubkey) ?? `peer-${pubkey.slice(0, 8)}`;
}
function decryptFailedWarning(senderPubkey: string): string { function decryptFailedWarning(senderPubkey: string): string {
const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender"; const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender";
return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`; return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`;
@@ -251,17 +269,10 @@ If you have multiple joined meshes, prefix the \`to\` argument of send_message w
for (const client of allClients()) { for (const client of allClients()) {
client.onPush(async (msg) => { client.onPush(async (msg) => {
const fromPubkey = msg.senderPubkey || ""; const fromPubkey = msg.senderPubkey || "";
// Resolve sender's display name from the peer list. // Resolve sender's display name from the cached peer list.
let fromName = fromPubkey const fromName = fromPubkey
? `peer-${fromPubkey.slice(0, 8)}` ? await resolvePeerName(client, fromPubkey)
: "unknown"; : "unknown";
try {
const peers = await client.listPeers();
const match = peers.find((p) => p.pubkey === fromPubkey);
if (match) fromName = match.displayName;
} catch {
/* best effort — fall back to truncated pubkey */
}
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey); const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
try { try {
await server.notification({ await server.notification({

View File

@@ -115,10 +115,12 @@ export class BrokerClient {
const onOpen = async (): Promise<void> => { const onOpen = async (): Promise<void> => {
this.debug("ws open → generating session keypair + signing hello"); this.debug("ws open → generating session keypair + signing hello");
try { try {
// Generate per-session ephemeral keypair for message routing. // Only generate session keypair on first connect, not reconnects
if (!this.sessionPubkey) {
const sessionKP = await generateKeypair(); const sessionKP = await generateKeypair();
this.sessionPubkey = sessionKP.publicKey; this.sessionPubkey = sessionKP.publicKey;
this.sessionSecretKey = sessionKP.secretKey; this.sessionSecretKey = sessionKP.secretKey;
}
const { timestamp, signature } = await signHello( const { timestamp, signature } = await signHello(
this.mesh.meshId, this.mesh.meshId,
@@ -376,6 +378,19 @@ export class BrokerClient {
plaintext = null; plaintext = null;
} }
} }
// Fallback: if direct decrypt failed, try plaintext base64 decode.
// This handles broadcasts and key mismatches gracefully.
if (plaintext === null && ciphertext) {
try {
const decoded = Buffer.from(ciphertext, "base64").toString("utf-8");
// Sanity check: valid UTF-8 text (not binary garbage)
if (/^[\x20-\x7E\s\u00A0-\uFFFF]*$/.test(decoded) && decoded.length > 0) {
plaintext = decoded;
}
} catch {
plaintext = null;
}
}
const push: InboundPush = { const push: InboundPush = {
messageId: String(msg.messageId ?? ""), messageId: String(msg.messageId ?? ""),
meshId: String(msg.meshId ?? ""), meshId: String(msg.meshId ?? ""),