diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 3c359fe..67da6fc 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -17,6 +17,7 @@ import { and, asc, + count, desc, eq, gte, @@ -34,6 +35,7 @@ import { presence, } from "@turbostarter/db/schema/mesh"; import { env } from "./env"; +import { metrics } from "./metrics"; import { inferStatusFromJsonl } from "./paths"; import type { HookSetStatusRequest, @@ -244,6 +246,16 @@ export async function sweepStuckWorking(): Promise { for (const row of stuck) { await writeStatus(row.id, "idle", "jsonl", now); } + metrics.ttlSweepsTotal.inc({ flipped: String(stuck.length) }); +} + +/** Update the queue_depth gauge from a single COUNT query. */ +export async function refreshQueueDepth(): Promise { + const [row] = await db + .select({ n: count() }) + .from(messageQueue) + .where(isNull(messageQueue.deliveredAt)); + metrics.queueDepth.set(Number(row?.n ?? 0)); } /** Sweep expired pending_status entries. */ diff --git a/apps/broker/src/build-info.ts b/apps/broker/src/build-info.ts new file mode 100644 index 0000000..f41cd2a --- /dev/null +++ b/apps/broker/src/build-info.ts @@ -0,0 +1,45 @@ +/** + * Build info surfaced on /health. + * + * gitSha is resolved lazily: + * 1. GIT_SHA env var (preferred — baked in at image build time) + * 2. `git rev-parse --short HEAD` (dev) + * 3. "unknown" if neither works + */ + +const VERSION = "0.1.0"; +const startedAt = Date.now(); + +let cachedSha: string | null = null; + +function resolveGitSha(): string { + if (cachedSha !== null) return cachedSha; + if (process.env.GIT_SHA) { + cachedSha = process.env.GIT_SHA; + return cachedSha; + } + try { + const proc = Bun.spawnSync(["git", "rev-parse", "--short", "HEAD"], { + stderr: "ignore", + }); + const sha = new TextDecoder().decode(proc.stdout).trim(); + cachedSha = sha || "unknown"; + } catch { + cachedSha = "unknown"; + } + return cachedSha; +} + +export function buildInfo(): { + version: string; + gitSha: string; + uptime: number; +} { + return { + version: VERSION, + gitSha: resolveGitSha(), + uptime: Math.floor((Date.now() - startedAt) / 1000), + }; +} + +export { VERSION }; diff --git a/apps/broker/src/db-health.ts b/apps/broker/src/db-health.ts new file mode 100644 index 0000000..3139fcc --- /dev/null +++ b/apps/broker/src/db-health.ts @@ -0,0 +1,70 @@ +/** + * Postgres connection health check with backoff retry. + * + * We don't tear down the broker on a transient DB blip — the + * surrounding HTTP/WS layer keeps serving, /health flips to 503, + * and the metrics gauge reflects reality. New queries will naturally + * fail while the DB is down; connectors that have retry logic of + * their own (postgres.js does) will recover transparently. + */ + +import { sql } from "drizzle-orm"; +import { db } from "./db"; +import { log } from "./logger"; +import { metrics } from "./metrics"; + +let healthy = false; +let consecutiveFailures = 0; +let pollTimer: ReturnType | null = null; + +export function isDbHealthy(): boolean { + return healthy; +} + +export async function pingDb(): Promise { + try { + await db.execute(sql`SELECT 1`); + if (!healthy) { + log.info("db healthy", { prior_failures: consecutiveFailures }); + } + healthy = true; + consecutiveFailures = 0; + metrics.dbHealthy.set(1); + return true; + } catch (e) { + consecutiveFailures += 1; + if (healthy || consecutiveFailures === 1) { + log.error("db ping failed", { + consecutive_failures: consecutiveFailures, + error: e instanceof Error ? e.message : String(e), + }); + } + healthy = false; + metrics.dbHealthy.set(0); + return false; + } +} + +/** + * Poll the DB on a backoff schedule while unhealthy, steady-state + * 30s interval while healthy. Runs in background; call stopDbHealth + * on shutdown. + */ +export function startDbHealth(): void { + if (pollTimer) return; + const tick = async (): Promise => { + await pingDb(); + const next = healthy + ? 30_000 + : Math.min(30_000, 500 * Math.pow(2, Math.min(consecutiveFailures, 6))); + pollTimer = setTimeout(() => { + void tick(); + }, next); + }; + void tick(); +} + +export function stopDbHealth(): void { + if (pollTimer) clearTimeout(pollTimer as unknown as number); + pollTimer = null; +} diff --git a/apps/broker/src/env.ts b/apps/broker/src/env.ts index 0440533..7f4c969 100644 --- a/apps/broker/src/env.ts +++ b/apps/broker/src/env.ts @@ -4,18 +4,26 @@ import { z } from "zod"; * Broker environment config. * * Validated at startup with Zod. Fails fast with a useful error if any - * required var is missing or malformed. Defaults mirror the values - * proven out in the claude-intercom prototype so local dev works - * without a .env file. + * required var is missing or malformed. */ const envSchema = z.object({ BROKER_PORT: z.coerce.number().int().positive().default(7900), - DATABASE_URL: z.string().min(1, "DATABASE_URL is required"), + DATABASE_URL: z + .string() + .min(1, "DATABASE_URL is required") + .refine( + (u) => /^postgres(ql)?:\/\//.test(u), + "DATABASE_URL must be a postgres:// or postgresql:// connection string", + ), STATUS_TTL_SECONDS: z.coerce.number().int().positive().default(60), HOOK_FRESH_WINDOW_SECONDS: z.coerce.number().int().positive().default(30), + MAX_CONNECTIONS_PER_MESH: z.coerce.number().int().positive().default(100), + MAX_MESSAGE_BYTES: z.coerce.number().int().positive().default(65_536), + HOOK_RATE_LIMIT_PER_MIN: z.coerce.number().int().positive().default(30), NODE_ENV: z .enum(["development", "production", "test"]) .default("development"), + GIT_SHA: z.string().optional(), }); export type BrokerEnv = z.infer; diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 850020d..4f68368 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -2,17 +2,17 @@ /** * @claudemesh/broker entry point. * - * Spins up two servers in a single process: - * - HTTP on BROKER_PORT+1 for the /hook/set-status endpoint - * (Claude Code hook scripts POST here on turn boundaries). - * - WebSocket on BROKER_PORT for authenticated peer connections - * (routes E2E-encrypted envelopes between mesh members). + * Single-port HTTP + WebSocket server. Routes: + * GET /health → liveness + build info (503 if DB down) + * GET /metrics → Prometheus plaintext + * POST /hook/set-status → Claude Code hook scripts report status + * WS /ws → authenticated peer connections * - * Background: TTL sweeper + pending-status sweeper. - * Shutdown: clean SIGTERM/SIGINT marks all presences disconnected. + * Graceful shutdown on SIGTERM/SIGINT: stops sweepers, marks all + * active presences disconnected in the DB, closes servers. */ -import { createServer, type IncomingMessage } from "node:http"; +import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; import { env } from "./env"; @@ -24,6 +24,7 @@ import { handleHookSetStatus, heartbeat, queueMessage, + refreshQueueDepth, refreshStatusFromJsonl, startSweepers, stopSweepers, @@ -35,28 +36,31 @@ import type { WSPushMessage, WSServerMessage, } from "./types"; +import { log } from "./logger"; +import { metrics, metricsToText } from "./metrics"; +import { TokenBucket } from "./rate-limit"; +import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; +import { buildInfo } from "./build-info"; -const VERSION = "0.1.0"; const PORT = env.BROKER_PORT; const WS_PATH = "/ws"; -function log(msg: string): void { - console.error(`[broker] ${msg}`); -} - // --- Runtime connection registry --- -/** In-memory map of presenceId → authenticated WS connection. */ -const connections = new Map< - string, - { - ws: WebSocket; - meshId: string; - memberId: string; - memberPubkey: string; - cwd: string; - } ->(); +interface PeerConn { + ws: WebSocket; + meshId: string; + memberId: string; + memberPubkey: string; + cwd: string; +} + +const connections = new Map(); +const connectionsPerMesh = new Map(); +const hookRateLimit = new TokenBucket( + env.HOOK_RATE_LIMIT_PER_MIN, + env.HOOK_RATE_LIMIT_PER_MIN, +); function sendToPeer(presenceId: string, msg: WSServerMessage): void { const conn = connections.get(presenceId); @@ -65,80 +69,11 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void { try { conn.ws.send(JSON.stringify(msg)); } catch (e) { - log(`push failed to ${presenceId}: ${e instanceof Error ? e.message : e}`); - } -} - -// --- Combined HTTP + WS server on a single port --- -// -// `ws` is run with noServer:true and attached to the HTTP server's -// 'upgrade' event. Clients connect to ws://host:PORT/ws; everything -// else is routed by the HTTP handler. - -function handleHttpRequest( - req: IncomingMessage, - res: import("node:http").ServerResponse, -): void { - res.setHeader("Access-Control-Allow-Origin", "*"); - res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS"); - res.setHeader("Access-Control-Allow-Headers", "Content-Type"); - if (req.method === "OPTIONS") { - res.writeHead(204); - res.end(); - return; - } - - if (req.method === "GET" && req.url === "/health") { - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify({ status: "ok", version: VERSION })); - return; - } - - if (req.method === "POST" && req.url === "/hook/set-status") { - let body = ""; - req.on("data", (chunk) => (body += chunk.toString())); - req.on("end", async () => { - try { - const payload = JSON.parse(body) as HookSetStatusRequest; - const result = await handleHookSetStatus(payload); - res.writeHead(200, { "Content-Type": "application/json" }); - res.end(JSON.stringify(result)); - - // If the hook flipped a presence to idle, drain queued - // "next" messages immediately for low-latency delivery. - if (result.ok && result.presence_id && !result.pending) { - void maybePushQueuedMessages(result.presence_id); - } - } catch (e) { - res.writeHead(500, { "Content-Type": "application/json" }); - res.end( - JSON.stringify({ - ok: false, - error: e instanceof Error ? e.message : String(e), - }), - ); - } + log.warn("push failed", { + presence_id: presenceId, + error: e instanceof Error ? e.message : String(e), }); - return; } - - res.writeHead(404); - res.end("not found"); -} - -function handleUpgrade( - wss: WebSocketServer, - req: IncomingMessage, - socket: Duplex, - head: Buffer, -): void { - if (req.url !== WS_PATH) { - socket.destroy(); - return; - } - wss.handleUpgrade(req, socket, head, (ws) => { - wss.emit("connection", ws, req); - }); } async function maybePushQueuedMessages(presenceId: string): Promise { @@ -167,26 +102,190 @@ async function maybePushQueuedMessages(presenceId: string): Promise { createdAt: m.createdAt.toISOString(), }; sendToPeer(presenceId, push); + metrics.messagesRoutedTotal.inc({ priority: m.priority }); } } -// --- WebSocket server (peer connections) --- +// --- HTTP request routing --- + +function writeJson(res: ServerResponse, status: number, body: unknown): void { + res.writeHead(status, { "Content-Type": "application/json" }); + res.end(JSON.stringify(body)); +} + +function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void { + const started = Date.now(); + res.setHeader("Access-Control-Allow-Origin", "*"); + res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS"); + res.setHeader("Access-Control-Allow-Headers", "Content-Type"); + if (req.method === "OPTIONS") { + res.writeHead(204); + res.end(); + return; + } + + const route = `${req.method} ${req.url}`; + + if (req.method === "GET" && req.url === "/health") { + const healthy = isDbHealthy(); + const status = healthy ? 200 : 503; + writeJson(res, status, { + status: healthy ? "ok" : "degraded", + db: healthy ? "up" : "down", + ...buildInfo(), + }); + log.debug("http", { route, status, latency_ms: Date.now() - started }); + return; + } + + if (req.method === "GET" && req.url === "/metrics") { + res.writeHead(200, { "Content-Type": "text/plain; version=0.0.4" }); + res.end(metricsToText()); + return; + } + + if (req.method === "POST" && req.url === "/hook/set-status") { + handleHookPost(req, res, started); + return; + } + + res.writeHead(404); + res.end("not found"); + log.debug("http", { route, status: 404, latency_ms: Date.now() - started }); +} + +function handleHookPost( + req: IncomingMessage, + res: ServerResponse, + started: number, +): void { + metrics.hookRequestsTotal.inc(); + const chunks: Buffer[] = []; + let total = 0; + let aborted = false; + + req.on("data", (chunk: Buffer) => { + if (aborted) return; + total += chunk.length; + if (total > env.MAX_MESSAGE_BYTES) { + aborted = true; + writeJson(res, 413, { ok: false, error: "payload too large" }); + req.destroy(); + return; + } + chunks.push(chunk); + }); + + req.on("end", async () => { + if (aborted) return; + try { + const payload = JSON.parse( + Buffer.concat(chunks).toString(), + ) as HookSetStatusRequest; + // Rate limit per (pid, cwd) if both present, else per cwd alone. + const rlKey = `${payload.pid ?? 0}:${payload.cwd ?? ""}`; + if (!hookRateLimit.take(rlKey)) { + metrics.hookRequestsRateLimited.inc(); + writeJson(res, 429, { ok: false, error: "rate limited" }); + log.warn("hook rate limited", { + cwd: payload.cwd, + pid: payload.pid, + }); + return; + } + const result = await handleHookSetStatus(payload); + writeJson(res, 200, result); + log.info("hook", { + route: "POST /hook/set-status", + cwd: payload.cwd, + pid: payload.pid, + status: payload.status, + presence_id: result.presence_id, + pending: result.pending ?? false, + latency_ms: Date.now() - started, + }); + if (result.ok && result.presence_id && !result.pending) { + void maybePushQueuedMessages(result.presence_id); + } + } catch (e) { + writeJson(res, 500, { + ok: false, + error: e instanceof Error ? e.message : String(e), + }); + log.error("hook handler error", { + error: e instanceof Error ? e.message : String(e), + }); + } + }); +} + +function handleUpgrade( + wss: WebSocketServer, + req: IncomingMessage, + socket: Duplex, + head: Buffer, +): void { + if (req.url !== WS_PATH) { + socket.destroy(); + return; + } + wss.handleUpgrade(req, socket, head, (ws) => { + wss.emit("connection", ws, req); + }); +} + +// --- WS protocol handlers --- + +function incMeshCount(meshId: string): number { + const n = (connectionsPerMesh.get(meshId) ?? 0) + 1; + connectionsPerMesh.set(meshId, n); + metrics.connectionsActive.set(connections.size + 1); + return n; +} + +function decMeshCount(meshId: string): void { + const n = (connectionsPerMesh.get(meshId) ?? 1) - 1; + if (n <= 0) connectionsPerMesh.delete(meshId); + else connectionsPerMesh.set(meshId, n); + metrics.connectionsActive.set(connections.size); +} + +function sendError( + ws: WebSocket, + code: string, + message: string, + id?: string, +): void { + const err: WSServerMessage = { type: "error", code, message, id }; + try { + ws.send(JSON.stringify(err)); + } catch { + /* ws already closed */ + } +} async function handleHello( ws: WebSocket, hello: Extract, ): Promise { - // Authenticate: member with this pubkey must exist in this mesh and - // not be revoked. Signature verification is TODO (crypto not wired - // yet; client-side libsodium sign_detached is planned). + // Capacity check BEFORE touching DB. + const existing = connectionsPerMesh.get(hello.meshId) ?? 0; + if (existing >= env.MAX_CONNECTIONS_PER_MESH) { + metrics.connectionsRejected.inc({ reason: "capacity" }); + log.warn("mesh at capacity", { + mesh_id: hello.meshId, + existing, + cap: env.MAX_CONNECTIONS_PER_MESH, + }); + sendError(ws, "capacity", "mesh at connection capacity"); + ws.close(1008, "capacity"); + return null; + } const member = await findMemberByPubkey(hello.meshId, hello.pubkey); if (!member) { - const err: WSServerMessage = { - type: "error", - code: "unauthorized", - message: "pubkey not found in mesh", - }; - ws.send(JSON.stringify(err)); + metrics.connectionsRejected.inc({ reason: "unauthorized" }); + sendError(ws, "unauthorized", "pubkey not found in mesh"); + ws.close(1008, "unauthorized"); return null; } const presenceId = await connectPresence({ @@ -202,16 +301,19 @@ async function handleHello( memberPubkey: hello.pubkey, cwd: hello.cwd, }); - log( - `hello: mesh=${hello.meshId} member=${member.displayName} presence=${presenceId}`, - ); - // Drain any messages already queued for this member. + incMeshCount(hello.meshId); + log.info("ws hello", { + mesh_id: hello.meshId, + member: member.displayName, + presence_id: presenceId, + session_id: hello.sessionId, + }); await maybePushQueuedMessages(presenceId); return presenceId; } async function handleSend( - conn: NonNullable>, + conn: PeerConn, msg: Extract, ): Promise { const messageId = await queueMessage({ @@ -230,17 +332,17 @@ async function handleSend( }; conn.ws.send(JSON.stringify(ack)); - // Fan-out: push to any currently-connected peer whose pubkey matches - // the target (or to everyone on broadcast). Drain their queue which - // handles priority gating automatically. + // Fan-out over connected peers in the same mesh. for (const [pid, peer] of connections) { if (peer.meshId !== conn.meshId) continue; - if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec) continue; + if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec) + continue; void maybePushQueuedMessages(pid); } } function handleConnection(ws: WebSocket): void { + metrics.connectionsTotal.inc(); let presenceId: string | null = null; ws.on("message", async (raw) => { try { @@ -250,12 +352,7 @@ function handleConnection(ws: WebSocket): void { return; } if (!presenceId) { - const err: WSServerMessage = { - type: "error", - code: "no_hello", - message: "must send hello first", - }; - ws.send(JSON.stringify(err)); + sendError(ws, "no_hello", "must send hello first"); return; } const conn = connections.get(presenceId); @@ -266,20 +363,32 @@ function handleConnection(ws: WebSocket): void { break; case "set_status": await writeStatus(presenceId, msg.status, "manual", new Date()); + log.info("ws set_status", { + presence_id: presenceId, + status: msg.status, + }); break; } } catch (e) { - log(`ws msg error: ${e instanceof Error ? e.message : e}`); + metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); + log.warn("ws message error", { + presence_id: presenceId, + error: e instanceof Error ? e.message : String(e), + }); } }); ws.on("close", async () => { if (presenceId) { + const conn = connections.get(presenceId); connections.delete(presenceId); + if (conn) decMeshCount(conn.meshId); await disconnectPresence(presenceId); - log(`disconnect: ${presenceId}`); + log.info("ws close", { presence_id: presenceId }); } }); - ws.on("error", (err) => log(`ws error: ${err.message}`)); + ws.on("error", (err) => { + log.warn("ws error", { error: err.message }); + }); ws.on("pong", () => { if (presenceId) void heartbeat(presenceId); }); @@ -288,7 +397,10 @@ function handleConnection(ws: WebSocket): void { // --- Main --- function main(): void { - const wss = new WebSocketServer({ noServer: true }); + const wss = new WebSocketServer({ + noServer: true, + maxPayload: env.MAX_MESSAGE_BYTES, + }); wss.on("connection", handleConnection); const http = createServer(handleHttpRequest); @@ -296,37 +408,66 @@ function main(): void { handleUpgrade(wss, req, socket, head), ); http.on("error", (err) => { - log(`http server error: ${err.message}`); + log.error("http server error", { error: err.message }); process.exit(1); }); http.listen(PORT, "0.0.0.0", () => { - log( - `@claudemesh/broker v${VERSION} listening on :${PORT} (ws:${WS_PATH}, http:/hook/set-status, http:/health) | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`, - ); + const info = buildInfo(); + log.info("broker listening", { + port: PORT, + version: info.version, + gitSha: info.gitSha, + ws_path: WS_PATH, + ttl_seconds: env.STATUS_TTL_SECONDS, + hook_fresh_seconds: env.HOOK_FRESH_WINDOW_SECONDS, + max_connections_per_mesh: env.MAX_CONNECTIONS_PER_MESH, + max_message_bytes: env.MAX_MESSAGE_BYTES, + hook_rate_limit_per_min: env.HOOK_RATE_LIMIT_PER_MIN, + }); }); - // Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt. - setInterval(() => { + // WS heartbeat ping every 30s; clients reply with pong → bumps lastPingAt. + const pingInterval = setInterval(() => { for (const { ws } of connections.values()) { if (ws.readyState === ws.OPEN) ws.ping(); } - }, 30_000).unref(); + }, 30_000); + pingInterval.unref(); + + // GC rate-limit buckets periodically. + const rlSweep = setInterval(() => hookRateLimit.sweep(), 5 * 60_000); + rlSweep.unref(); + + // Queue depth gauge refresh (fires the metric; cheap COUNT query). + const queueDepthTimer = setInterval(() => { + refreshQueueDepth().catch((e) => + log.warn("queue depth refresh failed", { + error: e instanceof Error ? e.message : String(e), + }), + ); + }, 30_000); + queueDepthTimer.unref(); startSweepers(); + startDbHealth(); const shutdown = async (signal: string): Promise => { - log(`${signal} received, shutting down`); + log.info("shutdown signal", { signal }); + clearInterval(pingInterval); + clearInterval(rlSweep); + clearInterval(queueDepthTimer); + stopDbHealth(); await stopSweepers(); for (const { ws } of connections.values()) { try { - ws.close(); + ws.close(1001, "shutting down"); } catch { /* ignore */ } } wss.close(); http.close(); - log("closed, bye"); + log.info("shutdown complete"); process.exit(0); }; diff --git a/apps/broker/src/logger.ts b/apps/broker/src/logger.ts new file mode 100644 index 0000000..0d41705 --- /dev/null +++ b/apps/broker/src/logger.ts @@ -0,0 +1,33 @@ +/** + * Structured JSON logger. + * + * One line per log event. Production observability tools (Datadog, + * Loki, etc.) can ingest these directly. Dev readability is + * secondary — if you're eyeballing, pipe through `jq`. + */ + +type LogLevel = "debug" | "info" | "warn" | "error"; + +interface LogContext { + [key: string]: unknown; +} + +function emit(level: LogLevel, msg: string, ctx: LogContext = {}): void { + const entry = { + ts: new Date().toISOString(), + level, + component: "broker", + msg, + ...ctx, + }; + // Single line, no pretty-printing. stderr so stdout is free for + // any app-level protocol chatter. + console.error(JSON.stringify(entry)); +} + +export const log = { + debug: (msg: string, ctx?: LogContext) => emit("debug", msg, ctx), + info: (msg: string, ctx?: LogContext) => emit("info", msg, ctx), + warn: (msg: string, ctx?: LogContext) => emit("warn", msg, ctx), + error: (msg: string, ctx?: LogContext) => emit("error", msg, ctx), +}; diff --git a/apps/broker/src/metrics.ts b/apps/broker/src/metrics.ts new file mode 100644 index 0000000..9a564fb --- /dev/null +++ b/apps/broker/src/metrics.ts @@ -0,0 +1,121 @@ +/** + * Minimal in-process metrics, exposed as Prometheus plaintext. + * + * Intentionally no external deps — we track a handful of counters + * and gauges that matter for broker ops. Scraped by /metrics. + */ + +type Labels = Record; + +class Counter { + private values = new Map(); + constructor( + public name: string, + public help: string, + ) {} + inc(labels: Labels = {}, by = 1): void { + const key = labelKey(labels); + this.values.set(key, (this.values.get(key) ?? 0) + by); + } + toText(): string { + const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} counter`]; + if (this.values.size === 0) { + lines.push(`${this.name} 0`); + } else { + for (const [key, v] of this.values) { + lines.push(`${this.name}${key} ${v}`); + } + } + return lines.join("\n"); + } +} + +class Gauge { + private values = new Map(); + constructor( + public name: string, + public help: string, + ) {} + set(value: number, labels: Labels = {}): void { + this.values.set(labelKey(labels), value); + } + inc(labels: Labels = {}, by = 1): void { + const key = labelKey(labels); + this.values.set(key, (this.values.get(key) ?? 0) + by); + } + dec(labels: Labels = {}, by = 1): void { + this.inc(labels, -by); + } + toText(): string { + const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} gauge`]; + if (this.values.size === 0) { + lines.push(`${this.name} 0`); + } else { + for (const [key, v] of this.values) { + lines.push(`${this.name}${key} ${v}`); + } + } + return lines.join("\n"); + } +} + +function labelKey(labels: Labels): string { + const entries = Object.entries(labels); + if (entries.length === 0) return ""; + const parts = entries + .sort(([a], [b]) => a.localeCompare(b)) + .map(([k, v]) => `${k}="${String(v).replace(/"/g, '\\"')}"`) + .join(","); + return `{${parts}}`; +} + +export const metrics = { + connectionsTotal: new Counter( + "broker_connections_total", + "Total WS connection attempts", + ), + connectionsRejected: new Counter( + "broker_connections_rejected_total", + "WS connections refused (auth failure, capacity, etc.)", + ), + connectionsActive: new Gauge( + "broker_connections_active", + "Currently connected peers", + ), + messagesRoutedTotal: new Counter( + "broker_messages_routed_total", + "Messages successfully queued + routed", + ), + messagesRejectedTotal: new Counter( + "broker_messages_rejected_total", + "Messages rejected (size, auth, malformed)", + ), + queueDepth: new Gauge( + "broker_queue_depth", + "Undelivered messages currently in the queue", + ), + ttlSweepsTotal: new Counter( + "broker_ttl_sweeps_total", + "TTL sweeper runs completed", + ), + hookRequestsTotal: new Counter( + "broker_hook_requests_total", + "POST /hook/set-status requests received", + ), + hookRequestsRateLimited: new Counter( + "broker_hook_requests_rate_limited_total", + "POST /hook/set-status rejected by rate limit", + ), + dbHealthy: new Gauge( + "broker_db_healthy", + "1 if Postgres connection is up, 0 if not", + ), +}; + +export function metricsToText(): string { + return ( + Object.values(metrics) + .map((m) => m.toText()) + .join("\n") + "\n" + ); +} diff --git a/apps/broker/src/rate-limit.ts b/apps/broker/src/rate-limit.ts new file mode 100644 index 0000000..d69c40d --- /dev/null +++ b/apps/broker/src/rate-limit.ts @@ -0,0 +1,61 @@ +/** + * Token-bucket rate limiter keyed by an arbitrary string. + * + * Used to cap POST /hook/set-status at a sane per-session rate + * (hook scripts legitimately fire every turn; anything faster is + * either a loop or a compromised agent). + * + * In-process only. If we scale to multiple broker instances this + * moves to Redis, but for the single-instance broker it's enough. + */ + +interface Bucket { + tokens: number; + lastRefill: number; +} + +export class TokenBucket { + private buckets = new Map(); + private readonly refillPerMs: number; + + constructor( + private capacity: number, + refillPerMinute: number, + ) { + this.refillPerMs = refillPerMinute / 60_000; + } + + /** Take one token. Returns true if allowed, false if rate-limited. */ + take(key: string, now = Date.now()): boolean { + const bucket = this.buckets.get(key) ?? { + tokens: this.capacity, + lastRefill: now, + }; + const elapsed = now - bucket.lastRefill; + if (elapsed > 0) { + bucket.tokens = Math.min( + this.capacity, + bucket.tokens + elapsed * this.refillPerMs, + ); + bucket.lastRefill = now; + } + if (bucket.tokens < 1) { + this.buckets.set(key, bucket); + return false; + } + bucket.tokens -= 1; + this.buckets.set(key, bucket); + return true; + } + + /** Periodic GC: drop buckets whose keys haven't been touched in a while. */ + sweep(olderThanMs = 10 * 60 * 1000, now = Date.now()): void { + for (const [key, bucket] of this.buckets) { + if (now - bucket.lastRefill > olderThanMs) this.buckets.delete(key); + } + } + + get size(): number { + return this.buckets.size; + } +}