From 3458860c1f13b3f3f2d91edd8646aa25ca05a55e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 4 Apr 2026 22:19:14 +0100 Subject: [PATCH] =?UTF-8?q?test(broker):=20coverage=20for=20hardening=20mo?= =?UTF-8?q?dules=20=E2=80=94=20caps,=20limits,=20metrics,=20health,=20logs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 23 tests across 4 files, taking total broker coverage from 21 → 44 passing in ~2.5s. Unit tests (no I/O): - tests/rate-limit.test.ts (6): TokenBucket capacity, refill rate, no-overflow cap, independent buckets per key, sweep GC. - tests/metrics.test.ts (5): all 10 series present in /metrics, counter increment semantics, labelled series produce distinct lines, gauge set overwrites, Prometheus format well-formed. - tests/logging.test.ts (5): JSON per line, required fields (ts, level, component, msg), context merging, level preservation, no plain-text escape hatches. Integration tests (spawn real broker subprocesses on random ports): - tests/integration/health.test.ts (7): * GET /health 200 + {status, db, version, gitSha, uptime} (healthy DB) * GET /health 503 + {status:degraded, db:down} (unreachable DB) * GET /metrics 200 text/plain with all expected series * GET /nope → 404 * POST /hook/set-status oversized body → 413 * POST /hook/set-status 6th req/min → 429 * Rate limit isolation by (pid, cwd) key Integration tests use node:child_process (vitest runs under Node, not Bun — Bun.spawn isn't available). Each suite spawns its own broker subprocess with a random port + tailored env vars. Not yet covered (flagged for follow-up): - WebSocket connection caps (needs seeded mesh + WS client setup) - WebSocket message-size rejection (ws.maxPayload behavior) Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/tests/integration/health.test.ts | 211 +++++++++++++++++++ apps/broker/tests/logging.test.ts | 71 +++++++ apps/broker/tests/metrics.test.ts | 80 +++++++ apps/broker/tests/rate-limit.test.ts | 76 +++++++ 4 files changed, 438 insertions(+) create mode 100644 apps/broker/tests/integration/health.test.ts create mode 100644 apps/broker/tests/logging.test.ts create mode 100644 apps/broker/tests/metrics.test.ts create mode 100644 apps/broker/tests/rate-limit.test.ts diff --git a/apps/broker/tests/integration/health.test.ts b/apps/broker/tests/integration/health.test.ts new file mode 100644 index 0000000..8d18754 --- /dev/null +++ b/apps/broker/tests/integration/health.test.ts @@ -0,0 +1,211 @@ +/** + * /health and /metrics integration tests. + * + * Spawns the broker as a subprocess on a random port. Covers: + * - GET /health with healthy DB → 200 + {status, db, version, gitSha, uptime} + * - GET /health with unreachable DB → 503 + {status:"degraded", db:"down"} + * - GET /metrics returns Prometheus plaintext with all expected series + * - POST /hook/set-status rate-limited after N requests + * - POST /hook/set-status oversized body returns 413 + */ + +import { afterAll, beforeAll, describe, expect, test } from "vitest"; +import { fileURLToPath } from "node:url"; +import { dirname, join } from "node:path"; +import { spawn, type ChildProcess } from "node:child_process"; + +interface BrokerProc { + port: number; + kill: () => void; +} + +async function waitHealthyOrAny(port: number, maxMs = 5000): Promise { + const start = Date.now(); + while (Date.now() - start < maxMs) { + try { + const r = await fetch(`http://localhost:${port}/health`, { + signal: AbortSignal.timeout(500), + }); + // Any response (even 503) means the HTTP server is up. + if (r.status === 200 || r.status === 503) return; + } catch { + /* not yet */ + } + await new Promise((r) => setTimeout(r, 100)); + } + throw new Error(`broker on :${port} did not come up`); +} + +function spawnBroker(env: Record): BrokerProc { + const port = 18000 + Math.floor(Math.random() * 1000); + const brokerEntry = join( + dirname(fileURLToPath(import.meta.url)), + "..", + "..", + "src", + "index.ts", + ); + const proc: ChildProcess = spawn("bun", [brokerEntry], { + env: { + ...process.env, + ...env, + BROKER_PORT: String(port), + }, + stdio: "ignore", + }); + return { + port, + kill: () => { + try { + proc.kill("SIGKILL"); + } catch { + /* already dead */ + } + }, + }; +} + +describe("/health endpoint", () => { + let broker: BrokerProc; + beforeAll(async () => { + broker = spawnBroker({ + DATABASE_URL: + process.env.DATABASE_URL ?? + "postgresql://turbostarter:turbostarter@127.0.0.1:5440/claudemesh_test", + }); + await waitHealthyOrAny(broker.port); + }); + afterAll(() => broker?.kill()); + + test("returns 200 + full payload when DB is up", async () => { + const r = await fetch(`http://localhost:${broker.port}/health`); + expect(r.status).toBe(200); + const body = (await r.json()) as Record; + expect(body.status).toBe("ok"); + expect(body.db).toBe("up"); + expect(body.version).toBe("0.1.0"); + expect(typeof body.gitSha).toBe("string"); + expect((body.gitSha as string).length).toBeGreaterThan(0); + expect(typeof body.uptime).toBe("number"); + expect(body.uptime).toBeGreaterThanOrEqual(0); + }); + + test("/metrics returns Prometheus plaintext with all expected series", async () => { + const r = await fetch(`http://localhost:${broker.port}/metrics`); + expect(r.status).toBe(200); + expect(r.headers.get("content-type")).toMatch(/text\/plain/); + const text = await r.text(); + const expected = [ + "broker_connections_total", + "broker_connections_rejected_total", + "broker_connections_active", + "broker_messages_routed_total", + "broker_queue_depth", + "broker_ttl_sweeps_total", + "broker_hook_requests_total", + "broker_db_healthy", + ]; + for (const name of expected) expect(text).toContain(name); + }); + + test("/health unknown route returns 404", async () => { + const r = await fetch(`http://localhost:${broker.port}/nope`); + expect(r.status).toBe(404); + }); +}); + +describe("/health with unreachable DB", () => { + let broker: BrokerProc; + beforeAll(async () => { + // Point at a port nothing is listening on — pg client fails fast. + broker = spawnBroker({ + DATABASE_URL: "postgresql://nobody:nothing@127.0.0.1:1/nowhere", + }); + await waitHealthyOrAny(broker.port); + }); + afterAll(() => broker?.kill()); + + test("returns 503 + degraded payload when DB unreachable", async () => { + // db-health starts its ping loop on boot — give it a moment to fail once. + await new Promise((r) => setTimeout(r, 1500)); + const r = await fetch(`http://localhost:${broker.port}/health`); + expect(r.status).toBe(503); + const body = (await r.json()) as Record; + expect(body.status).toBe("degraded"); + expect(body.db).toBe("down"); + // Build info still present even when degraded. + expect(body.version).toBe("0.1.0"); + expect(typeof body.gitSha).toBe("string"); + }); +}); + +describe("POST /hook/set-status rate limit + size limit", () => { + let broker: BrokerProc; + beforeAll(async () => { + broker = spawnBroker({ + DATABASE_URL: + process.env.DATABASE_URL ?? + "postgresql://turbostarter:turbostarter@127.0.0.1:5440/claudemesh_test", + HOOK_RATE_LIMIT_PER_MIN: "5", + MAX_MESSAGE_BYTES: "512", + }); + await waitHealthyOrAny(broker.port); + }); + afterAll(() => broker?.kill()); + + test("payload over MAX_MESSAGE_BYTES returns 413", async () => { + const big = "x".repeat(1024); + const r = await fetch( + `http://localhost:${broker.port}/hook/set-status`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ cwd: big, status: "idle" }), + }, + ); + expect(r.status).toBe(413); + const body = (await r.json()) as Record; + expect(body.ok).toBe(false); + }); + + test("6th request from same (pid, cwd) within a minute → 429", async () => { + const body = JSON.stringify({ + cwd: "/rate-test", + pid: 42, + status: "idle", + }); + const statuses: number[] = []; + for (let i = 0; i < 6; i++) { + const r = await fetch( + `http://localhost:${broker.port}/hook/set-status`, + { + method: "POST", + headers: { "Content-Type": "application/json" }, + body, + }, + ); + statuses.push(r.status); + } + expect(statuses.slice(0, 5)).toEqual([200, 200, 200, 200, 200]); + expect(statuses[5]).toBe(429); + }); + + test("rate limit is per (pid, cwd) — different key gets fresh bucket", async () => { + // Use unique key to avoid collision with previous test's bucket. + const body1 = JSON.stringify({ cwd: "/k1", pid: 1001, status: "idle" }); + const body2 = JSON.stringify({ cwd: "/k2", pid: 1002, status: "idle" }); + for (let i = 0; i < 5; i++) { + const r = await fetch( + `http://localhost:${broker.port}/hook/set-status`, + { method: "POST", headers: { "Content-Type": "application/json" }, body: body1 }, + ); + expect(r.status).toBe(200); + } + // key 1 now exhausted; key 2 still has full bucket + const r = await fetch( + `http://localhost:${broker.port}/hook/set-status`, + { method: "POST", headers: { "Content-Type": "application/json" }, body: body2 }, + ); + expect(r.status).toBe(200); + }); +}); diff --git a/apps/broker/tests/logging.test.ts b/apps/broker/tests/logging.test.ts new file mode 100644 index 0000000..af58837 --- /dev/null +++ b/apps/broker/tests/logging.test.ts @@ -0,0 +1,71 @@ +/** + * Structured logger output format tests. + * + * Intercepts stderr and asserts: one JSON object per line, required + * fields present, merged context preserved, no plain text leaks. + */ + +import { afterEach, beforeEach, describe, expect, test, vi } from "vitest"; +import { log } from "../src/logger"; + +let captured: string[] = []; +let originalError: typeof console.error; + +beforeEach(() => { + captured = []; + originalError = console.error; + console.error = vi.fn((msg: unknown) => { + captured.push(String(msg)); + }); +}); + +afterEach(() => { + console.error = originalError; +}); + +describe("structured logger", () => { + test("emits one JSON object per log call", () => { + log.info("test msg"); + expect(captured).toHaveLength(1); + expect(() => JSON.parse(captured[0]!)).not.toThrow(); + }); + + test("required fields: ts, level, component, msg", () => { + log.info("hello"); + const entry = JSON.parse(captured[0]!) as Record; + expect(entry.ts).toBeTruthy(); + expect(entry.level).toBe("info"); + expect(entry.component).toBe("broker"); + expect(entry.msg).toBe("hello"); + // ts should be valid ISO 8601 + expect(() => new Date(entry.ts as string)).not.toThrow(); + }); + + test("context object is merged into the entry", () => { + log.warn("capacity", { mesh_id: "m1", existing: 100, cap: 100 }); + const entry = JSON.parse(captured[0]!) as Record; + expect(entry.level).toBe("warn"); + expect(entry.mesh_id).toBe("m1"); + expect(entry.existing).toBe(100); + expect(entry.cap).toBe(100); + }); + + test("all four levels preserved on their respective emits", () => { + log.debug("d"); + log.info("i"); + log.warn("w"); + log.error("e"); + const levels = captured.map((s) => JSON.parse(s).level); + expect(levels).toEqual(["debug", "info", "warn", "error"]); + }); + + test("no plain-text escape hatches — output is always JSON", () => { + log.info("line 1"); + log.error("line 2", { code: "X" }); + log.debug("line 3"); + for (const line of captured) { + expect(line.trim()).toMatch(/^\{.*\}$/); + expect(() => JSON.parse(line)).not.toThrow(); + } + }); +}); diff --git a/apps/broker/tests/metrics.test.ts b/apps/broker/tests/metrics.test.ts new file mode 100644 index 0000000..0bdb503 --- /dev/null +++ b/apps/broker/tests/metrics.test.ts @@ -0,0 +1,80 @@ +/** + * Metrics output + counter/gauge behavior tests. + * + * Pure in-process — no DB, no network. Asserts Prometheus text + * format and counter/gauge increment semantics. + */ + +import { beforeEach, describe, expect, test } from "vitest"; +import { metrics, metricsToText } from "../src/metrics"; + +describe("metrics registry", () => { + test("every expected series is present in /metrics text", () => { + const text = metricsToText(); + const expected = [ + "broker_connections_total", + "broker_connections_rejected_total", + "broker_connections_active", + "broker_messages_routed_total", + "broker_messages_rejected_total", + "broker_queue_depth", + "broker_ttl_sweeps_total", + "broker_hook_requests_total", + "broker_hook_requests_rate_limited_total", + "broker_db_healthy", + ]; + for (const name of expected) { + expect(text).toContain(`# HELP ${name}`); + expect(text).toContain(`# TYPE ${name}`); + } + }); + + test("counter increments and appears in output", () => { + const before = metrics.connectionsTotal.toText(); + const beforeVal = parseInt( + before.split("\n").find((l) => l.startsWith("broker_connections_total ")) + ?.split(" ")[1] ?? "0", + 10, + ); + metrics.connectionsTotal.inc(); + metrics.connectionsTotal.inc(); + const after = metrics.connectionsTotal.toText(); + const afterVal = parseInt( + after.split("\n").find((l) => l.startsWith("broker_connections_total ")) + ?.split(" ")[1] ?? "0", + 10, + ); + expect(afterVal - beforeVal).toBeGreaterThanOrEqual(2); + }); + + test("counter labels produce separate series lines", () => { + metrics.messagesRoutedTotal.inc({ priority: "now" }); + metrics.messagesRoutedTotal.inc({ priority: "now" }); + metrics.messagesRoutedTotal.inc({ priority: "next" }); + const text = metrics.messagesRoutedTotal.toText(); + expect(text).toMatch(/broker_messages_routed_total\{priority="now"\}/); + expect(text).toMatch(/broker_messages_routed_total\{priority="next"\}/); + }); + + test("gauge set overwrites prior value", () => { + metrics.connectionsActive.set(5); + let text = metrics.connectionsActive.toText(); + expect(text).toMatch(/broker_connections_active 5/); + metrics.connectionsActive.set(2); + text = metrics.connectionsActive.toText(); + expect(text).toMatch(/broker_connections_active 2/); + expect(text).not.toMatch(/broker_connections_active 5/); + }); + + test("prometheus format is well-formed (HELP + TYPE before samples)", () => { + const text = metrics.queueDepth.toText(); + const lines = text.split("\n"); + expect(lines[0]).toMatch(/^# HELP broker_queue_depth /); + expect(lines[1]).toMatch(/^# TYPE broker_queue_depth gauge$/); + // Every non-comment line should be well-formed. + for (const line of lines.slice(2)) { + if (line.trim() === "") continue; + expect(line).toMatch(/^broker_queue_depth(\{[^}]*\})? -?\d+(\.\d+)?$/); + } + }); +}); diff --git a/apps/broker/tests/rate-limit.test.ts b/apps/broker/tests/rate-limit.test.ts new file mode 100644 index 0000000..0a14ba7 --- /dev/null +++ b/apps/broker/tests/rate-limit.test.ts @@ -0,0 +1,76 @@ +/** + * TokenBucket tests — pure unit tests, no I/O. + * + * Verifies the rate limiter applied to POST /hook/set-status. + * Uses injected `now` timestamps to avoid sleeps. + */ + +import { describe, expect, test } from "vitest"; +import { TokenBucket } from "../src/rate-limit"; + +describe("TokenBucket", () => { + test("allows up to `capacity` requests in a burst", () => { + const b = new TokenBucket(5, 60); // 5 capacity, 60/min refill + const t0 = 1_000_000; + for (let i = 0; i < 5; i++) { + expect(b.take("key", t0)).toBe(true); + } + expect(b.take("key", t0)).toBe(false); + }); + + test("30/min means 31st in first minute is rejected", () => { + const b = new TokenBucket(30, 30); + const t0 = 1_000_000; + // Burst: drain the bucket at t0. + for (let i = 0; i < 30; i++) expect(b.take("p:cwd", t0)).toBe(true); + expect(b.take("p:cwd", t0)).toBe(false); + }); + + test("refills over time", () => { + const b = new TokenBucket(5, 60); // refill rate = 60/min = 1/sec + const t0 = 1_000_000; + // Drain + for (let i = 0; i < 5; i++) b.take("k", t0); + expect(b.take("k", t0)).toBe(false); + // +1 second = +1 token + expect(b.take("k", t0 + 1000)).toBe(true); + expect(b.take("k", t0 + 1000)).toBe(false); + // +2 more seconds = +2 tokens + expect(b.take("k", t0 + 3000)).toBe(true); + expect(b.take("k", t0 + 3000)).toBe(true); + }); + + test("does not refill beyond capacity", () => { + const b = new TokenBucket(5, 60); + const t0 = 1_000_000; + b.take("k", t0); // 4 remaining + // Jump forward way past full refill + const far = t0 + 60 * 60 * 1000; // +1 hour + // Should allow only `capacity` consecutive takes, not more + for (let i = 0; i < 5; i++) expect(b.take("k", far)).toBe(true); + expect(b.take("k", far)).toBe(false); + }); + + test("different keys have independent buckets", () => { + const b = new TokenBucket(2, 60); + const t0 = 1_000_000; + expect(b.take("a", t0)).toBe(true); + expect(b.take("a", t0)).toBe(true); + expect(b.take("a", t0)).toBe(false); + // "b" is fresh. + expect(b.take("b", t0)).toBe(true); + expect(b.take("b", t0)).toBe(true); + expect(b.take("b", t0)).toBe(false); + }); + + test("sweep removes buckets older than threshold", () => { + const b = new TokenBucket(5, 60); + const t0 = 1_000_000; + b.take("stale", t0); + b.take("fresh", t0 + 100_000); + expect(b.size).toBe(2); + // Sweep anything untouched for >60s, as of t0 + 90s. + b.sweep(60_000, t0 + 90_000); + expect(b.size).toBe(1); + }); +});