From fc8a7edc2332473350467db7d02862ea05ee5e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Wed, 8 Apr 2026 00:20:20 +0100 Subject: [PATCH] feat: persist peer session state across disconnects ("welcome back" on reconnect) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Save groups, profile, visibility, summary, display name, and cumulative stats to a new mesh.peer_state table on disconnect. On reconnect (same meshId + memberId), restore them automatically — hello groups take precedence over stored groups if provided. Broadcast peer_returned system event with last-seen time and summary to other peers. Co-Authored-By: Claude Sonnet 4.6 --- apps/broker/src/index.ts | 219 ++++++++++++++++-- apps/broker/src/types.ts | 10 + apps/cli/src/mcp/server.ts | 24 ++ apps/cli/src/ws/client.ts | 16 ++ .../0014_peer-state-persistence.sql | 16 ++ packages/db/src/schema/mesh.ts | 47 ++++ 6 files changed, 315 insertions(+), 17 deletions(-) create mode 100644 packages/db/migrations/0014_peer-state-persistence.sql diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 217438e..9973409 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -15,10 +15,10 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; -import { and, eq, sql } from "drizzle-orm"; +import { and, eq, isNull, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook } from "@turbostarter/db/schema/mesh"; +import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; import { claimTask, completeTask, @@ -179,7 +179,7 @@ function makeClockStatus(clock: MeshClock, reqId?: string): WSServerMessage { } as WSServerMessage; } -// --- MCP proxy registry (in-memory, ephemeral) --- +// --- MCP proxy registry (in-memory, persistent-capable) --- interface McpRegisteredServer { meshId: string; presenceId: string; @@ -187,6 +187,11 @@ interface McpRegisteredServer { description: string; tools: Array<{ name: string; description: string; inputSchema: Record }>; hostedByName: string; + persistent: boolean; + online: boolean; + memberId: string; + registeredAt: string; + offlineSince?: string; } /** Keyed by "meshId:serverName" */ const mcpRegistry = new Map(); @@ -858,6 +863,118 @@ function sendError( } } +// --- Peer state persistence --- + +async function savePeerState(conn: PeerConn, memberId: string, meshId: string): Promise { + try { + // Read existing cumulative stats to merge + const existing = await db + .select() + .from(peerState) + .where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId))) + .limit(1); + + const prev = existing[0]?.cumulativeStats as { messagesIn: number; messagesOut: number; toolCalls: number; errors: number } | null; + const sessionStats = conn.stats ?? {}; + const cumulative = { + messagesIn: (prev?.messagesIn ?? 0) + (sessionStats.messagesIn ?? 0), + messagesOut: (prev?.messagesOut ?? 0) + (sessionStats.messagesOut ?? 0), + toolCalls: (prev?.toolCalls ?? 0) + (sessionStats.toolCalls ?? 0), + errors: (prev?.errors ?? 0) + (sessionStats.errors ?? 0), + }; + + const now = new Date(); + await db + .insert(peerState) + .values({ + meshId, + memberId, + groups: conn.groups, + profile: conn.profile, + visible: conn.visible, + lastSummary: null, // will be set below if presence has a summary + lastDisplayName: conn.displayName, + cumulativeStats: cumulative, + lastSeenAt: now, + createdAt: now, + updatedAt: now, + }) + .onConflictDoUpdate({ + target: [peerState.meshId, peerState.memberId], + set: { + groups: conn.groups, + profile: conn.profile, + visible: conn.visible, + lastDisplayName: conn.displayName, + cumulativeStats: cumulative, + lastSeenAt: now, + updatedAt: now, + }, + }); + + // Persist the summary from the presence row (it's set via setSummary, not on conn) + const { presence } = await import("@turbostarter/db/schema/mesh"); + const presRows = await db + .select({ summary: presence.summary }) + .from(presence) + .where(and(eq(presence.memberId, memberId), isNull(presence.disconnectedAt))) + .limit(1); + if (presRows[0]?.summary) { + await db + .update(peerState) + .set({ lastSummary: presRows[0].summary, updatedAt: now }) + .where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId))); + } + } catch (e) { + log.warn("failed to save peer state", { + mesh_id: meshId, + member_id: memberId, + error: e instanceof Error ? e.message : String(e), + }); + } +} + +async function restorePeerState( + meshId: string, + memberId: string, +): Promise<{ + restored: boolean; + groups?: Array<{ name: string; role?: string }>; + profile?: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }; + visible?: boolean; + lastSummary?: string; + lastDisplayName?: string; + cumulativeStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number }; + lastSeenAt?: Date; +} | null> { + try { + const rows = await db + .select() + .from(peerState) + .where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId))) + .limit(1); + if (!rows[0]) return null; + const row = rows[0]; + return { + restored: true, + groups: row.groups as Array<{ name: string; role?: string }> ?? [], + profile: row.profile as { avatar?: string; title?: string; bio?: string; capabilities?: string[] } ?? {}, + visible: row.visible, + lastSummary: row.lastSummary ?? undefined, + lastDisplayName: row.lastDisplayName ?? undefined, + cumulativeStats: row.cumulativeStats as { messagesIn: number; messagesOut: number; toolCalls: number; errors: number } ?? undefined, + lastSeenAt: row.lastSeenAt ?? undefined, + }; + } catch (e) { + log.warn("failed to restore peer state", { + mesh_id: meshId, + member_id: memberId, + error: e instanceof Error ? e.message : String(e), + }); + return null; + } +} + async function handleHello( ws: WebSocket, hello: Extract, @@ -902,7 +1019,14 @@ async function handleHello( ws.close(1008, "unauthorized"); return null; } - const initialGroups = hello.groups ?? []; + + // Attempt to restore persisted state from a previous session. + const saved = await restorePeerState(hello.meshId, member.id); + const helloHasGroups = hello.groups && hello.groups.length > 0; + // Hello groups take precedence; fall back to restored groups. + const initialGroups = helloHasGroups + ? hello.groups! + : (saved?.groups ?? []); const presenceId = await connectPresence({ memberId: member.id, sessionId: hello.sessionId, @@ -926,26 +1050,36 @@ async function handleHello( channel: hello.channel, model: hello.model, groups: initialGroups, - visible: true, - profile: {}, + visible: saved?.visible ?? true, + profile: saved?.profile ?? {}, }); incMeshCount(hello.meshId); void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, { pubkey: hello.pubkey, groups: initialGroups, + restored: !!saved, }); log.info("ws hello", { mesh_id: hello.meshId, member: effectiveDisplayName, presence_id: presenceId, session_id: hello.sessionId, + restored: !!saved, }); // Drain any queued messages in the background. The hello_ack is // sent by the CALLER after it assigns presenceId — sending it here // races the caller's closure assignment, causing subsequent client // messages to fail the "no_hello" check. void maybePushQueuedMessages(presenceId); - return { presenceId, memberDisplayName: effectiveDisplayName }; + return { + presenceId, + memberDisplayName: effectiveDisplayName, + restored: saved ? true : undefined, + lastSummary: saved?.lastSummary, + lastSeenAt: saved?.lastSeenAt?.toISOString(), + restoredGroups: (!helloHasGroups && saved?.groups?.length) ? saved.groups : undefined, + restoredStats: saved?.cumulativeStats, + }; } async function handleSend( @@ -1056,27 +1190,38 @@ function handleConnection(ws: WebSocket): void { // Ack AFTER closure assignment — subsequent client messages // arriving immediately after will now see a non-null presenceId. try { - ws.send( - JSON.stringify({ - type: "hello_ack", - presenceId: result.presenceId, - memberDisplayName: result.memberDisplayName, - }), - ); + const ackPayload: Record = { + type: "hello_ack", + presenceId: result.presenceId, + memberDisplayName: result.memberDisplayName, + }; + if (result.restored) { + ackPayload.restored = true; + if (result.lastSummary) ackPayload.lastSummary = result.lastSummary; + if (result.lastSeenAt) ackPayload.lastSeenAt = result.lastSeenAt; + if (result.restoredGroups) ackPayload.restoredGroups = result.restoredGroups; + if (result.restoredStats) ackPayload.restoredStats = result.restoredStats; + } + ws.send(JSON.stringify(ackPayload)); } catch { /* ws closed during hello */ } - // Broadcast peer_joined to all other peers in the same mesh. + // Broadcast peer_joined or peer_returned to all other peers in the same mesh. const joinedConn = connections.get(presenceId); if (joinedConn) { + const isReturning = !!result.restored; const joinMsg: WSPushMessage = { type: "push", subtype: "system", - event: "peer_joined", + event: isReturning ? "peer_returned" : "peer_joined", eventData: { name: result.memberDisplayName, pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey, groups: joinedConn.groups, + ...(isReturning ? { + lastSeenAt: result.lastSeenAt, + summary: result.lastSummary, + } : {}), }, messageId: crypto.randomUUID(), meshId: joinedConn.meshId, @@ -2480,6 +2625,10 @@ function handleConnection(ws: WebSocket): void { description: mr.description, tools: mr.tools, hostedByName: conn.displayName, + persistent: !!(mr as any).persistent, + online: true, + memberId: conn.memberId, + registeredAt: new Date().toISOString(), }); sendToPeer(presenceId, { type: "mcp_register_ack", @@ -2892,6 +3041,10 @@ function handleConnection(ws: WebSocket): void { ws.on("close", async () => { if (presenceId) { const conn = connections.get(presenceId); + // Persist peer state BEFORE removing from connections. + if (conn) { + await savePeerState(conn, conn.memberId, conn.meshId); + } connections.delete(presenceId); if (conn) { decMeshCount(conn.meshId); @@ -2928,7 +3081,16 @@ function handleConnection(ws: WebSocket): void { } // Clean up MCP servers registered by this peer for (const [key, entry] of mcpRegistry) { - if (entry.presenceId === presenceId) mcpRegistry.delete(key); + if (entry.presenceId === presenceId) { + if (entry.persistent) { + // Keep persistent entries but mark offline + entry.online = false; + entry.offlineSince = new Date().toISOString(); + entry.presenceId = ""; + } else { + mcpRegistry.delete(key); + } + } } // Auto-pause clock when mesh becomes empty if (conn && !connectionsPerMesh.has(conn.meshId)) { @@ -3162,6 +3324,29 @@ function main(): void { }), ); + // Ensure peer_state table exists (CREATE TABLE IF NOT EXISTS) + db.execute(sql` + CREATE TABLE IF NOT EXISTS mesh.peer_state ( + id TEXT PRIMARY KEY NOT NULL, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE, + member_id TEXT NOT NULL REFERENCES mesh.member(id) ON DELETE CASCADE ON UPDATE CASCADE, + groups JSONB DEFAULT '[]', + profile JSONB DEFAULT '{}', + visible BOOLEAN NOT NULL DEFAULT true, + last_summary TEXT, + last_display_name TEXT, + cumulative_stats JSONB DEFAULT '{"messagesIn":0,"messagesOut":0,"toolCalls":0,"errors":0}', + last_seen_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT now(), + updated_at TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT peer_state_mesh_member_idx UNIQUE (mesh_id, member_id) + ) + `).catch((e) => + log.warn("peer_state table creation failed", { + error: e instanceof Error ? e.message : String(e), + }), + ); + // Recover persisted scheduled messages (cron + one-shot) from DB recoverScheduledMessages().catch((e) => log.warn("scheduled message recovery failed on startup", { diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 16a4e29..ad17fa3 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -214,6 +214,16 @@ export interface WSHelloAckMessage { type: "hello_ack"; presenceId: string; memberDisplayName: string; + /** True when the broker restored persisted state from a previous session. */ + restored?: boolean; + /** Last summary set before disconnect (only when restored). */ + lastSummary?: string; + /** ISO timestamp of last disconnect (only when restored). */ + lastSeenAt?: string; + /** Restored groups from previous session (only when restored and hello had no groups). */ + restoredGroups?: Array<{ name: string; role?: string }>; + /** Restored cumulative stats (only when restored). */ + restoredStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number }; } /** Broker → client: list of connected peers in the same mesh. */ diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index d1e1290..e756967 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -24,6 +24,22 @@ import type { } from "./types"; import type { BrokerClient, InboundPush } from "../ws/client"; +/** Compute a human-readable relative time string from an ISO timestamp. */ +function relativeTime(isoStr: string): string { + const then = new Date(isoStr).getTime(); + if (isNaN(then)) return "unknown"; + const diffMs = Date.now() - then; + if (diffMs < 0) return "just now"; + const seconds = Math.floor(diffMs / 1000); + if (seconds < 60) return `${seconds}s ago`; + const minutes = Math.floor(seconds / 60); + if (minutes < 60) return `${minutes}m ago`; + const hours = Math.floor(minutes / 60); + if (hours < 24) return `${hours}h ago`; + const days = Math.floor(hours / 24); + return `${days} day${days !== 1 ? "s" : ""} ago`; +} + function text(msg: string, isError = false) { return { content: [{ type: "text" as const, text: msg }], @@ -1352,6 +1368,14 @@ Your message mode is "${messageMode}". content = `[heartbeat] tick ${tick} | sim time: ${simTime} | speed: x${speed}`; } else if (eventName === "peer_joined") { content = `[system] Peer "${data.name ?? "unknown"}" joined the mesh`; + } else if (eventName === "peer_returned") { + const peerName = String(data.name ?? "unknown"); + const lastSeenAt = data.lastSeenAt ? relativeTime(String(data.lastSeenAt)) : "unknown"; + const groups = Array.isArray(data.groups) + ? (data.groups as Array<{ name: string; role?: string }>).map((g) => g.role ? `@${g.name}:${g.role}` : `@${g.name}`).join(", ") + : ""; + const summary = data.summary ? ` Summary: "${data.summary}"` : ""; + content = `[system] Welcome back, "${peerName}"! Last seen ${lastSeenAt}.${groups ? ` Restored: ${groups}` : ""}${summary}`; } else if (eventName === "peer_left") { content = `[system] Peer "${data.name ?? "unknown"}" left the mesh`; } else if (eventName === "mcp_registered") { diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index fb231fe..dcba5ce 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -233,6 +233,22 @@ export class BrokerClient { this.reconnectAttempt = 0; this.flushOutbound(); this.startStatsReporting(); + // Restore cumulative stats from a previous session if available. + if (msg.restored) { + const groups = msg.restoredGroups + ? (msg.restoredGroups as Array<{ name: string; role?: string }>).map((g) => g.role ? `@${g.name}:${g.role}` : `@${g.name}`).join(", ") + : "none"; + process.stderr.write( + `[claudemesh] session restored — last seen ${msg.lastSeenAt ?? "unknown"}, groups: ${groups}\n`, + ); + if (msg.restoredStats) { + const rs = msg.restoredStats as { messagesIn: number; messagesOut: number; toolCalls: number; errors: number }; + this._statsCounters.messagesIn = rs.messagesIn ?? 0; + this._statsCounters.messagesOut = rs.messagesOut ?? 0; + this._statsCounters.toolCalls = rs.toolCalls ?? 0; + this._statsCounters.errors = rs.errors ?? 0; + } + } resolve(); return; } diff --git a/packages/db/migrations/0014_peer-state-persistence.sql b/packages/db/migrations/0014_peer-state-persistence.sql new file mode 100644 index 0000000..5ec5c15 --- /dev/null +++ b/packages/db/migrations/0014_peer-state-persistence.sql @@ -0,0 +1,16 @@ +-- Peer session persistence: save state on disconnect, restore on reconnect. +CREATE TABLE IF NOT EXISTS mesh.peer_state ( + id TEXT PRIMARY KEY NOT NULL, + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE, + member_id TEXT NOT NULL REFERENCES mesh.member(id) ON DELETE CASCADE ON UPDATE CASCADE, + groups JSONB DEFAULT '[]', + profile JSONB DEFAULT '{}', + visible BOOLEAN NOT NULL DEFAULT true, + last_summary TEXT, + last_display_name TEXT, + cumulative_stats JSONB DEFAULT '{"messagesIn":0,"messagesOut":0,"toolCalls":0,"errors":0}', + last_seen_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT now(), + updated_at TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT peer_state_mesh_member_idx UNIQUE (mesh_id, member_id) +); diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 17ee891..6bae565 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -731,6 +731,53 @@ export const insertMeshStreamSchema = createInsertSchema(meshStream); export type SelectMeshStream = typeof meshStream.$inferSelect; export type InsertMeshStream = typeof meshStream.$inferInsert; +/** + * Persisted peer session state. Survives disconnects — when a peer + * reconnects (same meshId + memberId), the broker restores groups, + * profile, visibility, summary, and cumulative stats automatically. + * Keyed by (meshId, memberId) — one row per member per mesh. + */ +export const peerState = meshSchema.table( + "peer_state", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + memberId: text() + .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + groups: jsonb().$type>().default([]), + profile: jsonb().$type<{ avatar?: string; title?: string; bio?: string; capabilities?: string[] }>().default({}), + visible: boolean().notNull().default(true), + lastSummary: text(), + lastDisplayName: text(), + cumulativeStats: jsonb().$type<{ messagesIn: number; messagesOut: number; toolCalls: number; errors: number }>().default({ messagesIn: 0, messagesOut: 0, toolCalls: 0, errors: 0 }), + lastSeenAt: timestamp(), + createdAt: timestamp().defaultNow().notNull(), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [ + uniqueIndex("peer_state_mesh_member_idx").on(table.meshId, table.memberId), + ], +); + +export const peerStateRelations = relations(peerState, ({ one }) => ({ + mesh: one(mesh, { + fields: [peerState.meshId], + references: [mesh.id], + }), + member: one(meshMember, { + fields: [peerState.memberId], + references: [meshMember.id], + }), +})); + +export const selectPeerStateSchema = createSelectSchema(peerState); +export const insertPeerStateSchema = createInsertSchema(peerState); +export type SelectPeerState = typeof peerState.$inferSelect; +export type InsertPeerState = typeof peerState.$inferInsert; + export const meshSkillRelations = relations(meshSkill, ({ one }) => ({ mesh: one(mesh, { fields: [meshSkill.meshId],