feat: add peer stats reporting (messages, tool calls, uptime, errors)

Peers self-report resource usage via set_stats; stats visible in
list_peers responses and the new mesh_stats MCP tool. CLI auto-reports
every 60s and tracks messagesIn/Out, toolCalls, uptime, and errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 23:52:26 +01:00
parent fe9285351b
commit b3b9972e60
5 changed files with 599 additions and 6 deletions

View File

@@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws";
import { and, eq, sql } from "drizzle-orm";
import { env } from "./env";
import { db } from "./db";
import { messageQueue, scheduledMessage as scheduledMessageTable } from "@turbostarter/db/schema/mesh";
import { messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook } from "@turbostarter/db/schema/mesh";
import {
claimTask,
completeTask,
@@ -85,6 +85,7 @@ import { TokenBucket } from "./rate-limit";
import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
import { buildInfo } from "./build-info";
import { verifyHelloSignature } from "./crypto";
import { handleWebhook } from "./webhooks";
const PORT = env.BROKER_PORT;
const WS_PATH = "/ws";
@@ -125,6 +126,57 @@ const connectionsPerMesh = new Map<string, number>();
// Stream subscriptions: "meshId:streamName" → Set of presenceIds
const streamSubscriptions = new Map<string, Set<string>>();
// --- Simulation clock state (per-mesh) ---
interface MeshClock {
speed: number;
paused: boolean;
tick: number;
simTimeMs: number;
realStartMs: number;
timer: ReturnType<typeof setInterval> | null;
}
const meshClocks = new Map<string, MeshClock>();
function broadcastClockTick(meshId: string, clock: MeshClock): void {
clock.tick++;
clock.simTimeMs += 60_000;
const tickMsg: WSPushMessage = {
type: "push",
subtype: "system" as const,
event: "tick",
eventData: { tick: clock.tick, simTime: new Date(clock.simTimeMs).toISOString(), speed: clock.speed },
messageId: crypto.randomUUID(),
meshId,
senderPubkey: "system",
priority: "low",
nonce: "",
ciphertext: "",
createdAt: new Date().toISOString(),
};
for (const [pid, peer] of connections) {
if (peer.meshId !== meshId) continue;
sendToPeer(pid, tickMsg);
}
}
function startClockInterval(meshId: string, clock: MeshClock): void {
if (clock.timer) clearInterval(clock.timer);
const intervalMs = 60_000 / clock.speed;
clock.timer = setInterval(() => broadcastClockTick(meshId, clock), intervalMs);
}
function makeClockStatus(clock: MeshClock, reqId?: string): WSServerMessage {
return {
type: "clock_status",
speed: clock.speed,
paused: clock.paused,
tick: clock.tick,
simTime: new Date(clock.simTimeMs).toISOString(),
startedAt: new Date(clock.realStartMs).toISOString(),
...(reqId ? { _reqId: reqId } : {}),
} as WSServerMessage;
}
// --- MCP proxy registry (in-memory, ephemeral) ---
interface McpRegisteredServer {
meshId: string;
@@ -402,6 +454,13 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
return;
}
// Inbound webhook: POST /hook/:meshId/:secret
const webhookMatch = req.method === "POST" && req.url?.match(/^\/hook\/([^/]+)\/([^/]+)$/);
if (webhookMatch) {
handleWebhookPost(req, res, webhookMatch[1]!, webhookMatch[2]!, started);
return;
}
res.writeHead(404);
res.end("not found");
log.debug("http", { route, status: 404, latency_ms: Date.now() - started });
@@ -693,6 +752,64 @@ function handleUploadPost(
});
}
/**
* Broadcast a push message to all connected peers in a mesh.
* Returns the number of peers the message was delivered to.
*/
function broadcastToMesh(meshId: string, msg: WSPushMessage): number {
let count = 0;
for (const [pid, peer] of connections) {
if (peer.meshId !== meshId) continue;
sendToPeer(pid, msg);
count++;
}
return count;
}
function handleWebhookPost(
req: IncomingMessage,
res: ServerResponse,
meshId: string,
secret: string,
started: number,
): void {
const chunks: Buffer[] = [];
let total = 0;
let aborted = false;
req.on("data", (chunk: Buffer) => {
if (aborted) return;
total += chunk.length;
if (total > env.MAX_MESSAGE_BYTES) {
aborted = true;
writeJson(res, 413, { ok: false, error: "payload too large" });
req.destroy();
return;
}
chunks.push(chunk);
});
req.on("end", async () => {
if (aborted) return;
try {
const body = JSON.parse(Buffer.concat(chunks).toString());
const result = await handleWebhook(meshId, secret, body, broadcastToMesh);
writeJson(res, result.status, result.body);
log.info("webhook", {
route: `POST /hook/${meshId}/***`,
status: result.status,
delivered: result.body.delivered,
latency_ms: Date.now() - started,
});
} catch (e) {
writeJson(res, 400, { ok: false, error: "invalid JSON" });
log.warn("webhook parse error", {
error: e instanceof Error ? e.message : String(e),
});
}
});
}
function handleUpgrade(
wss: WebSocketServer,
req: IncomingMessage,
@@ -880,9 +997,11 @@ async function handleSend(
if (peer.meshId !== conn.meshId) continue;
if (isBroadcast) {
// broadcast — deliver to everyone
// broadcast — skip hidden peers
if (!peer.visible) continue;
} else if (groupName) {
// group routing — deliver only if peer is in the group
// group routing — deliver only if peer is in the group; skip hidden
if (!peer.visible) continue;
if (!peer.groups.some((g) => g.name === groupName)) continue;
} else {
// direct routing — match by pubkey
@@ -993,7 +1112,13 @@ function handleConnection(ws: WebSocket): void {
}
const resp: WSServerMessage = {
type: "peers_list",
peers: peers.map((p) => {
peers: peers
.filter((p) => {
const pc = connByPubkey.get(p.pubkey);
if (pc && !pc.visible && pc.memberPubkey !== conn.memberPubkey) return false;
return true;
})
.map((p) => {
const pc = connByPubkey.get(p.pubkey);
return {
pubkey: p.pubkey,
@@ -1008,6 +1133,8 @@ function handleConnection(ws: WebSocket): void {
...(pc?.channel ? { channel: pc.channel } : {}),
...(pc?.model ? { model: pc.model } : {}),
...(pc?.stats ? { stats: pc.stats } : {}),
...(pc ? { visible: pc.visible } : {}),
...(pc?.profile && Object.keys(pc.profile).length > 0 ? { profile: pc.profile } : {}),
};
}),
...(_reqId ? { _reqId } : {}),
@@ -1038,6 +1165,45 @@ function handleConnection(ws: WebSocket): void {
});
break;
}
case "set_visible": {
const sv = msg as Extract<WSClientMessage, { type: "set_visible" }>;
conn.visible = sv.visible;
// Broadcast visibility change to peers in same mesh
const visEvent: WSPushMessage = {
type: "push",
subtype: "system",
event: sv.visible ? "peer_visible" : "peer_hidden",
eventData: {
name: conn.displayName,
pubkey: conn.sessionPubkey ?? conn.memberPubkey,
},
messageId: crypto.randomUUID(),
meshId: conn.meshId,
senderPubkey: "system",
priority: "low",
nonce: "",
ciphertext: "",
createdAt: new Date().toISOString(),
};
for (const [pid, peer] of connections) {
if (pid === presenceId) continue;
if (peer.meshId !== conn.meshId) continue;
sendToPeer(pid, visEvent);
}
conn.ws.send(JSON.stringify({ type: "ack", id: _reqId ?? "", messageId: "", queued: false, ...(_reqId ? { _reqId } : {}) }));
log.info("ws set_visible", { presence_id: presenceId, visible: sv.visible });
break;
}
case "set_profile": {
const sp = msg as Extract<WSClientMessage, { type: "set_profile" }>;
if (sp.avatar !== undefined) conn.profile.avatar = sp.avatar;
if (sp.title !== undefined) conn.profile.title = sp.title;
if (sp.bio !== undefined) conn.profile.bio = sp.bio;
if (sp.capabilities !== undefined) conn.profile.capabilities = sp.capabilities;
conn.ws.send(JSON.stringify({ type: "ack", id: _reqId ?? "", messageId: "", queued: false, ...(_reqId ? { _reqId } : {}) }));
log.info("ws set_profile", { presence_id: presenceId, profile: conn.profile });
break;
}
case "join_group": {
const jg = msg as Extract<WSClientMessage, { type: "join_group" }>;
const updatedGroups = await joinGroup(presenceId, jg.name, jg.role);
@@ -2187,6 +2353,68 @@ function handleConnection(ws: WebSocket): void {
break;
}
// --- Simulation clock ---
case "set_clock": {
const sc = msg as Extract<WSClientMessage, { type: "set_clock" }>;
const speed = Math.max(1, Math.min(100, Number(sc.speed) || 1));
let clock = meshClocks.get(conn.meshId);
if (!clock) {
clock = {
speed,
paused: false,
tick: 0,
simTimeMs: Date.now(),
realStartMs: Date.now(),
timer: null,
};
meshClocks.set(conn.meshId, clock);
} else {
clock.speed = speed;
}
if (!clock.paused) {
startClockInterval(conn.meshId, clock);
}
sendToPeer(presenceId, makeClockStatus(clock, _reqId));
log.info("ws set_clock", { presence_id: presenceId, mesh_id: conn.meshId, speed });
break;
}
case "pause_clock": {
const clock = meshClocks.get(conn.meshId);
if (clock) {
clock.paused = true;
if (clock.timer) { clearInterval(clock.timer); clock.timer = null; }
}
sendToPeer(presenceId, clock
? makeClockStatus(clock, _reqId)
: { type: "error", code: "no_clock", message: "No clock running for this mesh", ...(_reqId ? { _reqId } : {}) } as WSServerMessage);
log.info("ws pause_clock", { presence_id: presenceId, mesh_id: conn.meshId });
break;
}
case "resume_clock": {
const clock = meshClocks.get(conn.meshId);
if (clock && clock.paused) {
clock.paused = false;
startClockInterval(conn.meshId, clock);
}
sendToPeer(presenceId, clock
? makeClockStatus(clock, _reqId)
: { type: "error", code: "no_clock", message: "No clock running for this mesh", ...(_reqId ? { _reqId } : {}) } as WSServerMessage);
log.info("ws resume_clock", { presence_id: presenceId, mesh_id: conn.meshId });
break;
}
case "get_clock": {
const clock = meshClocks.get(conn.meshId);
sendToPeer(presenceId, clock
? makeClockStatus(clock, _reqId)
: { type: "clock_status", speed: 0, paused: true, tick: 0, simTime: new Date().toISOString(), startedAt: new Date().toISOString(), ...(_reqId ? { _reqId } : {}) } as WSServerMessage);
log.info("ws get_clock", { presence_id: presenceId, mesh_id: conn.meshId });
break;
}
// --- MCP proxy ---
case "mcp_register": {
const mr = msg as Extract<WSClientMessage, { type: "mcp_register" }>;
@@ -2322,6 +2550,80 @@ function handleConnection(ws: WebSocket): void {
}
break;
}
// --- Skills ---
case "share_skill": {
const sk = msg as Extract<WSClientMessage, { type: "share_skill" }>;
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
await shareSkill(
conn.meshId,
sk.name,
sk.description,
sk.instructions,
sk.tags ?? [],
memberInfo?.id,
memberInfo?.displayName,
);
sendToPeer(presenceId, {
type: "skill_ack",
name: sk.name,
action: "shared",
...(_reqId ? { _reqId } : {}),
});
log.info("ws share_skill", { presence_id: presenceId, name: sk.name });
break;
}
case "get_skill": {
const gs = msg as Extract<WSClientMessage, { type: "get_skill" }>;
const skill = await getSkill(conn.meshId, gs.name);
sendToPeer(presenceId, {
type: "skill_data",
skill: skill
? {
name: skill.name,
description: skill.description,
instructions: skill.instructions,
tags: skill.tags,
author: skill.author,
createdAt: skill.createdAt.toISOString(),
}
: null,
...(_reqId ? { _reqId } : {}),
});
log.info("ws get_skill", { presence_id: presenceId, name: gs.name, found: !!skill });
break;
}
case "list_skills": {
const ls = msg as Extract<WSClientMessage, { type: "list_skills" }>;
const skills = await listSkills(conn.meshId, ls.query);
sendToPeer(presenceId, {
type: "skill_list",
skills: skills.map((s) => ({
name: s.name,
description: s.description,
tags: s.tags,
author: s.author,
createdAt: s.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_skills", { presence_id: presenceId, query: ls.query ?? "", count: skills.length });
break;
}
case "remove_skill": {
const rs = msg as Extract<WSClientMessage, { type: "remove_skill" }>;
const removed = await removeSkill(conn.meshId, rs.name);
sendToPeer(presenceId, {
type: "skill_ack",
name: rs.name,
action: removed ? "removed" : "not_found",
...(_reqId ? { _reqId } : {}),
});
log.info("ws remove_skill", { presence_id: presenceId, name: rs.name, removed });
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
@@ -2369,6 +2671,16 @@ function handleConnection(ws: WebSocket): void {
for (const [key, entry] of mcpRegistry) {
if (entry.presenceId === presenceId) mcpRegistry.delete(key);
}
// Auto-pause clock when mesh becomes empty
if (conn && !connectionsPerMesh.has(conn.meshId)) {
const clock = meshClocks.get(conn.meshId);
if (clock && clock.timer) {
clearInterval(clock.timer);
clock.timer = null;
clock.paused = true;
log.info("clock auto-paused (mesh empty)", { mesh_id: conn.meshId });
}
}
log.info("ws close", { presence_id: presenceId });
}
});

View File

@@ -834,6 +834,80 @@ export interface WSWebhookListMessage {
_reqId?: string;
}
// --- Peer file sharing (relay) messages ---
/** Client → broker: request a file from a peer's local filesystem. */
export interface WSPeerFileRequestMessage {
type: "peer_file_request";
targetPubkey: string;
filePath: string;
_reqId?: string;
}
/** Broker → target peer: forwarded file request from another peer. */
export interface WSPeerFileRequestForwardMessage {
type: "peer_file_request_forward";
requesterPubkey: string;
filePath: string;
_reqId?: string;
}
/** Target peer → broker: response with file content (or error). */
export interface WSPeerFileResponseMessage {
type: "peer_file_response";
requesterPubkey: string;
filePath: string;
content?: string; // base64 encoded
error?: string;
_reqId?: string;
}
/** Broker → requester: forwarded file content from target peer. */
export interface WSPeerFileResponseForwardMessage {
type: "peer_file_response_forward";
filePath: string;
content?: string;
error?: string;
_reqId?: string;
}
/** Client → broker: request a directory listing from a peer. */
export interface WSPeerDirRequestMessage {
type: "peer_dir_request";
targetPubkey: string;
dirPath: string;
pattern?: string;
_reqId?: string;
}
/** Broker → target peer: forwarded directory listing request. */
export interface WSPeerDirRequestForwardMessage {
type: "peer_dir_request_forward";
requesterPubkey: string;
dirPath: string;
pattern?: string;
_reqId?: string;
}
/** Target peer → broker: directory listing response. */
export interface WSPeerDirResponseMessage {
type: "peer_dir_response";
requesterPubkey: string;
dirPath: string;
entries?: string[];
error?: string;
_reqId?: string;
}
/** Broker → requester: forwarded directory listing from target peer. */
export interface WSPeerDirResponseForwardMessage {
type: "peer_dir_response_forward";
dirPath: string;
entries?: string[];
error?: string;
_reqId?: string;
}
/** Broker → client: structured error. */
export interface WSErrorMessage {
type: "error";
@@ -1010,7 +1084,11 @@ export type WSClientMessage =
| WSSetStatsMessage
| WSCreateWebhookMessage
| WSListWebhooksMessage
| WSDeleteWebhookMessage;
| WSDeleteWebhookMessage
| WSPeerFileRequestMessage
| WSPeerFileResponseMessage
| WSPeerDirRequestMessage
| WSPeerDirResponseMessage;
// --- Skill messages ---
@@ -1124,4 +1202,8 @@ export type WSServerMessage =
| WSSkillListMessage
| WSWebhookAckMessage
| WSWebhookListMessage
| WSPeerFileRequestForwardMessage
| WSPeerFileResponseForwardMessage
| WSPeerDirRequestForwardMessage
| WSPeerDirResponseForwardMessage
| WSErrorMessage;