diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index e6b92f0..8c64415 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -265,6 +265,23 @@ export async function refreshQueueDepth(): Promise { metrics.queueDepth.set(Number(row?.n ?? 0)); } +/** + * Sweep stale presences: mark as disconnected if last_ping_at is older + * than 90s (3 missed pings at the 30s interval = dead session). + */ +export async function sweepStalePresences(): Promise { + const cutoff = new Date(Date.now() - 90_000); // 3 missed pings + await db + .update(presence) + .set({ disconnectedAt: new Date() }) + .where( + and( + isNull(presence.disconnectedAt), + lt(presence.lastPingAt, cutoff), + ), + ); +} + /** Sweep expired pending_status entries. */ export async function sweepPendingStatuses(): Promise { const cutoff = new Date(Date.now() - PENDING_TTL_MS); @@ -475,6 +492,7 @@ export async function drainForMember( memberPubkey: string, status: PeerStatus, sessionPubkey?: string, + excludeSenderMemberId?: string, ): Promise< Array<{ id: string; @@ -516,6 +534,7 @@ export async function drainForMember( AND delivered_at IS NULL AND priority::text IN (${priorityList}) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``}) + ${excludeSenderMemberId ? sql`AND sender_member_id != ${excludeSenderMemberId}` : sql``} ORDER BY created_at ASC, id ASC FOR UPDATE SKIP LOCKED ) @@ -553,6 +572,7 @@ export async function drainForMember( let ttlTimer: ReturnType | null = null; let pendingTimer: ReturnType | null = null; +let staleTimer: ReturnType | null = null; /** Start background sweepers. Idempotent. */ export function startSweepers(): void { @@ -565,14 +585,21 @@ export function startSweepers(): void { console.error("[broker] pending sweep:", e), ); }, PENDING_SWEEP_INTERVAL_MS); + staleTimer = setInterval(() => { + sweepStalePresences().catch((e) => + console.error("[broker] stale presence sweep:", e), + ); + }, 30_000); } /** Stop background sweepers and mark all active presences disconnected. */ export async function stopSweepers(): Promise { if (ttlTimer) clearInterval(ttlTimer); if (pendingTimer) clearInterval(pendingTimer); + if (staleTimer) clearInterval(staleTimer); ttlTimer = null; pendingTimer = null; + staleTimer = null; await db .update(presence) .set({ disconnectedAt: new Date() }) diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 6a2f2e0..e631c2a 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -81,7 +81,10 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void { } } -async function maybePushQueuedMessages(presenceId: string): Promise { +async function maybePushQueuedMessages( + presenceId: string, + excludeSenderMemberId?: string, +): Promise { const conn = connections.get(presenceId); if (!conn) return; const status = await refreshStatusFromJsonl( @@ -95,6 +98,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise { conn.memberPubkey, status, conn.sessionPubkey ?? undefined, + excludeSenderMemberId, ); for (const m of messages) { const push: WSPushMessage = { @@ -452,14 +456,21 @@ async function handleSend( }; conn.ws.send(JSON.stringify(ack)); - // Fan-out over connected peers in the same mesh. + // Find sender's presenceId to exclude from fan-out. + let senderPresenceId: string | undefined; for (const [pid, peer] of connections) { + if (peer.ws === conn.ws) { senderPresenceId = pid; break; } + } + + // Fan-out over connected peers in the same mesh — skip sender. + for (const [pid, peer] of connections) { + if (pid === senderPresenceId) continue; if (peer.meshId !== conn.meshId) continue; if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec && peer.sessionPubkey !== msg.targetSpec) continue; - void maybePushQueuedMessages(pid); + void maybePushQueuedMessages(pid, conn.memberId); } } diff --git a/apps/cli/package.json b/apps/cli/package.json index 489f12c..b9e8b74 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "0.1.14", + "version": "0.1.15", "description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.", "keywords": [ "claude-code", diff --git a/apps/cli/src/commands/launch.ts b/apps/cli/src/commands/launch.ts index e5d233d..fd5bc87 100644 --- a/apps/cli/src/commands/launch.ts +++ b/apps/cli/src/commands/launch.ts @@ -11,7 +11,7 @@ */ import { spawn } from "node:child_process"; -import { mkdtempSync, writeFileSync, rmSync } from "node:fs"; +import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync } from "node:fs"; import { tmpdir, hostname } from "node:os"; import { join } from "node:path"; import { createInterface } from "node:readline"; @@ -215,6 +215,17 @@ export async function runLaunch(extraArgs: string[]): Promise { // We just set the display name via env var. const displayName = args.name ?? `${hostname()}-${process.pid}`; + // Clean up orphaned tmpdirs from crashed sessions (older than 1 hour) + const tmpBase = tmpdir(); + try { + for (const entry of readdirSync(tmpBase)) { + if (!entry.startsWith("claudemesh-")) continue; + const full = join(tmpBase, entry); + const age = Date.now() - statSync(full).mtimeMs; + if (age > 3600_000) rmSync(full, { recursive: true, force: true }); + } + } catch { /* best effort */ } + // 4. Write session config to tmpdir (isolates mesh selection). const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-")); const sessionConfig: Config = { diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 6db3cea..18ea88b 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -98,6 +98,24 @@ async function resolveClient(to: string): Promise<{ }; } +// Peer name cache to avoid calling listPeers on every incoming push +const peerNameCache = new Map(); +let peerNameCacheAge = 0; +const CACHE_TTL_MS = 30_000; + +async function resolvePeerName(client: BrokerClient, pubkey: string): Promise { + const now = Date.now(); + if (now - peerNameCacheAge > CACHE_TTL_MS) { + peerNameCache.clear(); + try { + const peers = await client.listPeers(); + for (const p of peers) peerNameCache.set(p.pubkey, p.displayName); + } catch { /* best effort */ } + peerNameCacheAge = now; + } + return peerNameCache.get(pubkey) ?? `peer-${pubkey.slice(0, 8)}`; +} + function decryptFailedWarning(senderPubkey: string): string { const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender"; return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`; @@ -251,17 +269,10 @@ If you have multiple joined meshes, prefix the \`to\` argument of send_message w for (const client of allClients()) { client.onPush(async (msg) => { const fromPubkey = msg.senderPubkey || ""; - // Resolve sender's display name from the peer list. - let fromName = fromPubkey - ? `peer-${fromPubkey.slice(0, 8)}` + // Resolve sender's display name from the cached peer list. + const fromName = fromPubkey + ? await resolvePeerName(client, fromPubkey) : "unknown"; - try { - const peers = await client.listPeers(); - const match = peers.find((p) => p.pubkey === fromPubkey); - if (match) fromName = match.displayName; - } catch { - /* best effort — fall back to truncated pubkey */ - } const content = msg.plaintext ?? decryptFailedWarning(fromPubkey); try { await server.notification({ diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 971bbdd..42d364e 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -115,10 +115,12 @@ export class BrokerClient { const onOpen = async (): Promise => { this.debug("ws open → generating session keypair + signing hello"); try { - // Generate per-session ephemeral keypair for message routing. - const sessionKP = await generateKeypair(); - this.sessionPubkey = sessionKP.publicKey; - this.sessionSecretKey = sessionKP.secretKey; + // Only generate session keypair on first connect, not reconnects + if (!this.sessionPubkey) { + const sessionKP = await generateKeypair(); + this.sessionPubkey = sessionKP.publicKey; + this.sessionSecretKey = sessionKP.secretKey; + } const { timestamp, signature } = await signHello( this.mesh.meshId, @@ -376,6 +378,19 @@ export class BrokerClient { plaintext = null; } } + // Fallback: if direct decrypt failed, try plaintext base64 decode. + // This handles broadcasts and key mismatches gracefully. + if (plaintext === null && ciphertext) { + try { + const decoded = Buffer.from(ciphertext, "base64").toString("utf-8"); + // Sanity check: valid UTF-8 text (not binary garbage) + if (/^[\x20-\x7E\s\u00A0-\uFFFF]*$/.test(decoded) && decoded.length > 0) { + plaintext = decoded; + } + } catch { + plaintext = null; + } + } const push: InboundPush = { messageId: String(msg.messageId ?? ""), meshId: String(msg.meshId ?? ""),