feat: persist peer session state across disconnects ("welcome back" on reconnect)

Save groups, profile, visibility, summary, display name, and cumulative
stats to a new mesh.peer_state table on disconnect. On reconnect (same
meshId + memberId), restore them automatically — hello groups take
precedence over stored groups if provided. Broadcast peer_returned
system event with last-seen time and summary to other peers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-08 00:20:20 +01:00
parent e09671cdcb
commit fc8a7edc23
6 changed files with 315 additions and 17 deletions

View File

@@ -15,10 +15,10 @@
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
import type { Duplex } from "node:stream";
import { WebSocketServer, type WebSocket } from "ws";
import { and, eq, sql } from "drizzle-orm";
import { and, eq, isNull, sql } from "drizzle-orm";
import { env } from "./env";
import { db } from "./db";
import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook } from "@turbostarter/db/schema/mesh";
import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
import {
claimTask,
completeTask,
@@ -179,7 +179,7 @@ function makeClockStatus(clock: MeshClock, reqId?: string): WSServerMessage {
} as WSServerMessage;
}
// --- MCP proxy registry (in-memory, ephemeral) ---
// --- MCP proxy registry (in-memory, persistent-capable) ---
interface McpRegisteredServer {
meshId: string;
presenceId: string;
@@ -187,6 +187,11 @@ interface McpRegisteredServer {
description: string;
tools: Array<{ name: string; description: string; inputSchema: Record<string, unknown> }>;
hostedByName: string;
persistent: boolean;
online: boolean;
memberId: string;
registeredAt: string;
offlineSince?: string;
}
/** Keyed by "meshId:serverName" */
const mcpRegistry = new Map<string, McpRegisteredServer>();
@@ -858,6 +863,118 @@ function sendError(
}
}
// --- Peer state persistence ---
async function savePeerState(conn: PeerConn, memberId: string, meshId: string): Promise<void> {
try {
// Read existing cumulative stats to merge
const existing = await db
.select()
.from(peerState)
.where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId)))
.limit(1);
const prev = existing[0]?.cumulativeStats as { messagesIn: number; messagesOut: number; toolCalls: number; errors: number } | null;
const sessionStats = conn.stats ?? {};
const cumulative = {
messagesIn: (prev?.messagesIn ?? 0) + (sessionStats.messagesIn ?? 0),
messagesOut: (prev?.messagesOut ?? 0) + (sessionStats.messagesOut ?? 0),
toolCalls: (prev?.toolCalls ?? 0) + (sessionStats.toolCalls ?? 0),
errors: (prev?.errors ?? 0) + (sessionStats.errors ?? 0),
};
const now = new Date();
await db
.insert(peerState)
.values({
meshId,
memberId,
groups: conn.groups,
profile: conn.profile,
visible: conn.visible,
lastSummary: null, // will be set below if presence has a summary
lastDisplayName: conn.displayName,
cumulativeStats: cumulative,
lastSeenAt: now,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: [peerState.meshId, peerState.memberId],
set: {
groups: conn.groups,
profile: conn.profile,
visible: conn.visible,
lastDisplayName: conn.displayName,
cumulativeStats: cumulative,
lastSeenAt: now,
updatedAt: now,
},
});
// Persist the summary from the presence row (it's set via setSummary, not on conn)
const { presence } = await import("@turbostarter/db/schema/mesh");
const presRows = await db
.select({ summary: presence.summary })
.from(presence)
.where(and(eq(presence.memberId, memberId), isNull(presence.disconnectedAt)))
.limit(1);
if (presRows[0]?.summary) {
await db
.update(peerState)
.set({ lastSummary: presRows[0].summary, updatedAt: now })
.where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId)));
}
} catch (e) {
log.warn("failed to save peer state", {
mesh_id: meshId,
member_id: memberId,
error: e instanceof Error ? e.message : String(e),
});
}
}
async function restorePeerState(
meshId: string,
memberId: string,
): Promise<{
restored: boolean;
groups?: Array<{ name: string; role?: string }>;
profile?: { avatar?: string; title?: string; bio?: string; capabilities?: string[] };
visible?: boolean;
lastSummary?: string;
lastDisplayName?: string;
cumulativeStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number };
lastSeenAt?: Date;
} | null> {
try {
const rows = await db
.select()
.from(peerState)
.where(and(eq(peerState.meshId, meshId), eq(peerState.memberId, memberId)))
.limit(1);
if (!rows[0]) return null;
const row = rows[0];
return {
restored: true,
groups: row.groups as Array<{ name: string; role?: string }> ?? [],
profile: row.profile as { avatar?: string; title?: string; bio?: string; capabilities?: string[] } ?? {},
visible: row.visible,
lastSummary: row.lastSummary ?? undefined,
lastDisplayName: row.lastDisplayName ?? undefined,
cumulativeStats: row.cumulativeStats as { messagesIn: number; messagesOut: number; toolCalls: number; errors: number } ?? undefined,
lastSeenAt: row.lastSeenAt ?? undefined,
};
} catch (e) {
log.warn("failed to restore peer state", {
mesh_id: meshId,
member_id: memberId,
error: e instanceof Error ? e.message : String(e),
});
return null;
}
}
async function handleHello(
ws: WebSocket,
hello: Extract<WSClientMessage, { type: "hello" }>,
@@ -902,7 +1019,14 @@ async function handleHello(
ws.close(1008, "unauthorized");
return null;
}
const initialGroups = hello.groups ?? [];
// Attempt to restore persisted state from a previous session.
const saved = await restorePeerState(hello.meshId, member.id);
const helloHasGroups = hello.groups && hello.groups.length > 0;
// Hello groups take precedence; fall back to restored groups.
const initialGroups = helloHasGroups
? hello.groups!
: (saved?.groups ?? []);
const presenceId = await connectPresence({
memberId: member.id,
sessionId: hello.sessionId,
@@ -926,26 +1050,36 @@ async function handleHello(
channel: hello.channel,
model: hello.model,
groups: initialGroups,
visible: true,
profile: {},
visible: saved?.visible ?? true,
profile: saved?.profile ?? {},
});
incMeshCount(hello.meshId);
void audit(hello.meshId, "peer_joined", member.id, effectiveDisplayName, {
pubkey: hello.pubkey,
groups: initialGroups,
restored: !!saved,
});
log.info("ws hello", {
mesh_id: hello.meshId,
member: effectiveDisplayName,
presence_id: presenceId,
session_id: hello.sessionId,
restored: !!saved,
});
// Drain any queued messages in the background. The hello_ack is
// sent by the CALLER after it assigns presenceId — sending it here
// races the caller's closure assignment, causing subsequent client
// messages to fail the "no_hello" check.
void maybePushQueuedMessages(presenceId);
return { presenceId, memberDisplayName: effectiveDisplayName };
return {
presenceId,
memberDisplayName: effectiveDisplayName,
restored: saved ? true : undefined,
lastSummary: saved?.lastSummary,
lastSeenAt: saved?.lastSeenAt?.toISOString(),
restoredGroups: (!helloHasGroups && saved?.groups?.length) ? saved.groups : undefined,
restoredStats: saved?.cumulativeStats,
};
}
async function handleSend(
@@ -1056,27 +1190,38 @@ function handleConnection(ws: WebSocket): void {
// Ack AFTER closure assignment — subsequent client messages
// arriving immediately after will now see a non-null presenceId.
try {
ws.send(
JSON.stringify({
type: "hello_ack",
presenceId: result.presenceId,
memberDisplayName: result.memberDisplayName,
}),
);
const ackPayload: Record<string, unknown> = {
type: "hello_ack",
presenceId: result.presenceId,
memberDisplayName: result.memberDisplayName,
};
if (result.restored) {
ackPayload.restored = true;
if (result.lastSummary) ackPayload.lastSummary = result.lastSummary;
if (result.lastSeenAt) ackPayload.lastSeenAt = result.lastSeenAt;
if (result.restoredGroups) ackPayload.restoredGroups = result.restoredGroups;
if (result.restoredStats) ackPayload.restoredStats = result.restoredStats;
}
ws.send(JSON.stringify(ackPayload));
} catch {
/* ws closed during hello */
}
// Broadcast peer_joined to all other peers in the same mesh.
// Broadcast peer_joined or peer_returned to all other peers in the same mesh.
const joinedConn = connections.get(presenceId);
if (joinedConn) {
const isReturning = !!result.restored;
const joinMsg: WSPushMessage = {
type: "push",
subtype: "system",
event: "peer_joined",
event: isReturning ? "peer_returned" : "peer_joined",
eventData: {
name: result.memberDisplayName,
pubkey: joinedConn.sessionPubkey ?? joinedConn.memberPubkey,
groups: joinedConn.groups,
...(isReturning ? {
lastSeenAt: result.lastSeenAt,
summary: result.lastSummary,
} : {}),
},
messageId: crypto.randomUUID(),
meshId: joinedConn.meshId,
@@ -2480,6 +2625,10 @@ function handleConnection(ws: WebSocket): void {
description: mr.description,
tools: mr.tools,
hostedByName: conn.displayName,
persistent: !!(mr as any).persistent,
online: true,
memberId: conn.memberId,
registeredAt: new Date().toISOString(),
});
sendToPeer(presenceId, {
type: "mcp_register_ack",
@@ -2892,6 +3041,10 @@ function handleConnection(ws: WebSocket): void {
ws.on("close", async () => {
if (presenceId) {
const conn = connections.get(presenceId);
// Persist peer state BEFORE removing from connections.
if (conn) {
await savePeerState(conn, conn.memberId, conn.meshId);
}
connections.delete(presenceId);
if (conn) {
decMeshCount(conn.meshId);
@@ -2928,7 +3081,16 @@ function handleConnection(ws: WebSocket): void {
}
// Clean up MCP servers registered by this peer
for (const [key, entry] of mcpRegistry) {
if (entry.presenceId === presenceId) mcpRegistry.delete(key);
if (entry.presenceId === presenceId) {
if (entry.persistent) {
// Keep persistent entries but mark offline
entry.online = false;
entry.offlineSince = new Date().toISOString();
entry.presenceId = "";
} else {
mcpRegistry.delete(key);
}
}
}
// Auto-pause clock when mesh becomes empty
if (conn && !connectionsPerMesh.has(conn.meshId)) {
@@ -3162,6 +3324,29 @@ function main(): void {
}),
);
// Ensure peer_state table exists (CREATE TABLE IF NOT EXISTS)
db.execute(sql`
CREATE TABLE IF NOT EXISTS mesh.peer_state (
id TEXT PRIMARY KEY NOT NULL,
mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE,
member_id TEXT NOT NULL REFERENCES mesh.member(id) ON DELETE CASCADE ON UPDATE CASCADE,
groups JSONB DEFAULT '[]',
profile JSONB DEFAULT '{}',
visible BOOLEAN NOT NULL DEFAULT true,
last_summary TEXT,
last_display_name TEXT,
cumulative_stats JSONB DEFAULT '{"messagesIn":0,"messagesOut":0,"toolCalls":0,"errors":0}',
last_seen_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT now(),
updated_at TIMESTAMP NOT NULL DEFAULT now(),
CONSTRAINT peer_state_mesh_member_idx UNIQUE (mesh_id, member_id)
)
`).catch((e) =>
log.warn("peer_state table creation failed", {
error: e instanceof Error ? e.message : String(e),
}),
);
// Recover persisted scheduled messages (cron + one-shot) from DB
recoverScheduledMessages().catch((e) =>
log.warn("scheduled message recovery failed on startup", {

View File

@@ -214,6 +214,16 @@ export interface WSHelloAckMessage {
type: "hello_ack";
presenceId: string;
memberDisplayName: string;
/** True when the broker restored persisted state from a previous session. */
restored?: boolean;
/** Last summary set before disconnect (only when restored). */
lastSummary?: string;
/** ISO timestamp of last disconnect (only when restored). */
lastSeenAt?: string;
/** Restored groups from previous session (only when restored and hello had no groups). */
restoredGroups?: Array<{ name: string; role?: string }>;
/** Restored cumulative stats (only when restored). */
restoredStats?: { messagesIn: number; messagesOut: number; toolCalls: number; errors: number };
}
/** Broker → client: list of connected peers in the same mesh. */