test(broker): coverage for hardening modules — caps, limits, metrics, health, logs
Adds 23 tests across 4 files, taking total broker coverage from
21 → 44 passing in ~2.5s.
Unit tests (no I/O):
- tests/rate-limit.test.ts (6): TokenBucket capacity, refill rate,
no-overflow cap, independent buckets per key, sweep GC.
- tests/metrics.test.ts (5): all 10 series present in /metrics,
counter increment semantics, labelled series produce distinct lines,
gauge set overwrites, Prometheus format well-formed.
- tests/logging.test.ts (5): JSON per line, required fields (ts, level,
component, msg), context merging, level preservation, no plain-text
escape hatches.
Integration tests (spawn real broker subprocesses on random ports):
- tests/integration/health.test.ts (7):
* GET /health 200 + {status, db, version, gitSha, uptime} (healthy DB)
* GET /health 503 + {status:degraded, db:down} (unreachable DB)
* GET /metrics 200 text/plain with all expected series
* GET /nope → 404
* POST /hook/set-status oversized body → 413
* POST /hook/set-status 6th req/min → 429
* Rate limit isolation by (pid, cwd) key
Integration tests use node:child_process (vitest runs under Node, not
Bun — Bun.spawn isn't available). Each suite spawns its own broker
subprocess with a random port + tailored env vars.
Not yet covered (flagged for follow-up):
- WebSocket connection caps (needs seeded mesh + WS client setup)
- WebSocket message-size rejection (ws.maxPayload behavior)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
211
apps/broker/tests/integration/health.test.ts
Normal file
211
apps/broker/tests/integration/health.test.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
/**
|
||||
* /health and /metrics integration tests.
|
||||
*
|
||||
* Spawns the broker as a subprocess on a random port. Covers:
|
||||
* - GET /health with healthy DB → 200 + {status, db, version, gitSha, uptime}
|
||||
* - GET /health with unreachable DB → 503 + {status:"degraded", db:"down"}
|
||||
* - GET /metrics returns Prometheus plaintext with all expected series
|
||||
* - POST /hook/set-status rate-limited after N requests
|
||||
* - POST /hook/set-status oversized body returns 413
|
||||
*/
|
||||
|
||||
import { afterAll, beforeAll, describe, expect, test } from "vitest";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { dirname, join } from "node:path";
|
||||
import { spawn, type ChildProcess } from "node:child_process";
|
||||
|
||||
interface BrokerProc {
|
||||
port: number;
|
||||
kill: () => void;
|
||||
}
|
||||
|
||||
async function waitHealthyOrAny(port: number, maxMs = 5000): Promise<void> {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < maxMs) {
|
||||
try {
|
||||
const r = await fetch(`http://localhost:${port}/health`, {
|
||||
signal: AbortSignal.timeout(500),
|
||||
});
|
||||
// Any response (even 503) means the HTTP server is up.
|
||||
if (r.status === 200 || r.status === 503) return;
|
||||
} catch {
|
||||
/* not yet */
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
}
|
||||
throw new Error(`broker on :${port} did not come up`);
|
||||
}
|
||||
|
||||
function spawnBroker(env: Record<string, string>): BrokerProc {
|
||||
const port = 18000 + Math.floor(Math.random() * 1000);
|
||||
const brokerEntry = join(
|
||||
dirname(fileURLToPath(import.meta.url)),
|
||||
"..",
|
||||
"..",
|
||||
"src",
|
||||
"index.ts",
|
||||
);
|
||||
const proc: ChildProcess = spawn("bun", [brokerEntry], {
|
||||
env: {
|
||||
...process.env,
|
||||
...env,
|
||||
BROKER_PORT: String(port),
|
||||
},
|
||||
stdio: "ignore",
|
||||
});
|
||||
return {
|
||||
port,
|
||||
kill: () => {
|
||||
try {
|
||||
proc.kill("SIGKILL");
|
||||
} catch {
|
||||
/* already dead */
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("/health endpoint", () => {
|
||||
let broker: BrokerProc;
|
||||
beforeAll(async () => {
|
||||
broker = spawnBroker({
|
||||
DATABASE_URL:
|
||||
process.env.DATABASE_URL ??
|
||||
"postgresql://turbostarter:turbostarter@127.0.0.1:5440/claudemesh_test",
|
||||
});
|
||||
await waitHealthyOrAny(broker.port);
|
||||
});
|
||||
afterAll(() => broker?.kill());
|
||||
|
||||
test("returns 200 + full payload when DB is up", async () => {
|
||||
const r = await fetch(`http://localhost:${broker.port}/health`);
|
||||
expect(r.status).toBe(200);
|
||||
const body = (await r.json()) as Record<string, unknown>;
|
||||
expect(body.status).toBe("ok");
|
||||
expect(body.db).toBe("up");
|
||||
expect(body.version).toBe("0.1.0");
|
||||
expect(typeof body.gitSha).toBe("string");
|
||||
expect((body.gitSha as string).length).toBeGreaterThan(0);
|
||||
expect(typeof body.uptime).toBe("number");
|
||||
expect(body.uptime).toBeGreaterThanOrEqual(0);
|
||||
});
|
||||
|
||||
test("/metrics returns Prometheus plaintext with all expected series", async () => {
|
||||
const r = await fetch(`http://localhost:${broker.port}/metrics`);
|
||||
expect(r.status).toBe(200);
|
||||
expect(r.headers.get("content-type")).toMatch(/text\/plain/);
|
||||
const text = await r.text();
|
||||
const expected = [
|
||||
"broker_connections_total",
|
||||
"broker_connections_rejected_total",
|
||||
"broker_connections_active",
|
||||
"broker_messages_routed_total",
|
||||
"broker_queue_depth",
|
||||
"broker_ttl_sweeps_total",
|
||||
"broker_hook_requests_total",
|
||||
"broker_db_healthy",
|
||||
];
|
||||
for (const name of expected) expect(text).toContain(name);
|
||||
});
|
||||
|
||||
test("/health unknown route returns 404", async () => {
|
||||
const r = await fetch(`http://localhost:${broker.port}/nope`);
|
||||
expect(r.status).toBe(404);
|
||||
});
|
||||
});
|
||||
|
||||
describe("/health with unreachable DB", () => {
|
||||
let broker: BrokerProc;
|
||||
beforeAll(async () => {
|
||||
// Point at a port nothing is listening on — pg client fails fast.
|
||||
broker = spawnBroker({
|
||||
DATABASE_URL: "postgresql://nobody:nothing@127.0.0.1:1/nowhere",
|
||||
});
|
||||
await waitHealthyOrAny(broker.port);
|
||||
});
|
||||
afterAll(() => broker?.kill());
|
||||
|
||||
test("returns 503 + degraded payload when DB unreachable", async () => {
|
||||
// db-health starts its ping loop on boot — give it a moment to fail once.
|
||||
await new Promise((r) => setTimeout(r, 1500));
|
||||
const r = await fetch(`http://localhost:${broker.port}/health`);
|
||||
expect(r.status).toBe(503);
|
||||
const body = (await r.json()) as Record<string, unknown>;
|
||||
expect(body.status).toBe("degraded");
|
||||
expect(body.db).toBe("down");
|
||||
// Build info still present even when degraded.
|
||||
expect(body.version).toBe("0.1.0");
|
||||
expect(typeof body.gitSha).toBe("string");
|
||||
});
|
||||
});
|
||||
|
||||
describe("POST /hook/set-status rate limit + size limit", () => {
|
||||
let broker: BrokerProc;
|
||||
beforeAll(async () => {
|
||||
broker = spawnBroker({
|
||||
DATABASE_URL:
|
||||
process.env.DATABASE_URL ??
|
||||
"postgresql://turbostarter:turbostarter@127.0.0.1:5440/claudemesh_test",
|
||||
HOOK_RATE_LIMIT_PER_MIN: "5",
|
||||
MAX_MESSAGE_BYTES: "512",
|
||||
});
|
||||
await waitHealthyOrAny(broker.port);
|
||||
});
|
||||
afterAll(() => broker?.kill());
|
||||
|
||||
test("payload over MAX_MESSAGE_BYTES returns 413", async () => {
|
||||
const big = "x".repeat(1024);
|
||||
const r = await fetch(
|
||||
`http://localhost:${broker.port}/hook/set-status`,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify({ cwd: big, status: "idle" }),
|
||||
},
|
||||
);
|
||||
expect(r.status).toBe(413);
|
||||
const body = (await r.json()) as Record<string, unknown>;
|
||||
expect(body.ok).toBe(false);
|
||||
});
|
||||
|
||||
test("6th request from same (pid, cwd) within a minute → 429", async () => {
|
||||
const body = JSON.stringify({
|
||||
cwd: "/rate-test",
|
||||
pid: 42,
|
||||
status: "idle",
|
||||
});
|
||||
const statuses: number[] = [];
|
||||
for (let i = 0; i < 6; i++) {
|
||||
const r = await fetch(
|
||||
`http://localhost:${broker.port}/hook/set-status`,
|
||||
{
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body,
|
||||
},
|
||||
);
|
||||
statuses.push(r.status);
|
||||
}
|
||||
expect(statuses.slice(0, 5)).toEqual([200, 200, 200, 200, 200]);
|
||||
expect(statuses[5]).toBe(429);
|
||||
});
|
||||
|
||||
test("rate limit is per (pid, cwd) — different key gets fresh bucket", async () => {
|
||||
// Use unique key to avoid collision with previous test's bucket.
|
||||
const body1 = JSON.stringify({ cwd: "/k1", pid: 1001, status: "idle" });
|
||||
const body2 = JSON.stringify({ cwd: "/k2", pid: 1002, status: "idle" });
|
||||
for (let i = 0; i < 5; i++) {
|
||||
const r = await fetch(
|
||||
`http://localhost:${broker.port}/hook/set-status`,
|
||||
{ method: "POST", headers: { "Content-Type": "application/json" }, body: body1 },
|
||||
);
|
||||
expect(r.status).toBe(200);
|
||||
}
|
||||
// key 1 now exhausted; key 2 still has full bucket
|
||||
const r = await fetch(
|
||||
`http://localhost:${broker.port}/hook/set-status`,
|
||||
{ method: "POST", headers: { "Content-Type": "application/json" }, body: body2 },
|
||||
);
|
||||
expect(r.status).toBe(200);
|
||||
});
|
||||
});
|
||||
71
apps/broker/tests/logging.test.ts
Normal file
71
apps/broker/tests/logging.test.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
/**
|
||||
* Structured logger output format tests.
|
||||
*
|
||||
* Intercepts stderr and asserts: one JSON object per line, required
|
||||
* fields present, merged context preserved, no plain text leaks.
|
||||
*/
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import { log } from "../src/logger";
|
||||
|
||||
let captured: string[] = [];
|
||||
let originalError: typeof console.error;
|
||||
|
||||
beforeEach(() => {
|
||||
captured = [];
|
||||
originalError = console.error;
|
||||
console.error = vi.fn((msg: unknown) => {
|
||||
captured.push(String(msg));
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
console.error = originalError;
|
||||
});
|
||||
|
||||
describe("structured logger", () => {
|
||||
test("emits one JSON object per log call", () => {
|
||||
log.info("test msg");
|
||||
expect(captured).toHaveLength(1);
|
||||
expect(() => JSON.parse(captured[0]!)).not.toThrow();
|
||||
});
|
||||
|
||||
test("required fields: ts, level, component, msg", () => {
|
||||
log.info("hello");
|
||||
const entry = JSON.parse(captured[0]!) as Record<string, unknown>;
|
||||
expect(entry.ts).toBeTruthy();
|
||||
expect(entry.level).toBe("info");
|
||||
expect(entry.component).toBe("broker");
|
||||
expect(entry.msg).toBe("hello");
|
||||
// ts should be valid ISO 8601
|
||||
expect(() => new Date(entry.ts as string)).not.toThrow();
|
||||
});
|
||||
|
||||
test("context object is merged into the entry", () => {
|
||||
log.warn("capacity", { mesh_id: "m1", existing: 100, cap: 100 });
|
||||
const entry = JSON.parse(captured[0]!) as Record<string, unknown>;
|
||||
expect(entry.level).toBe("warn");
|
||||
expect(entry.mesh_id).toBe("m1");
|
||||
expect(entry.existing).toBe(100);
|
||||
expect(entry.cap).toBe(100);
|
||||
});
|
||||
|
||||
test("all four levels preserved on their respective emits", () => {
|
||||
log.debug("d");
|
||||
log.info("i");
|
||||
log.warn("w");
|
||||
log.error("e");
|
||||
const levels = captured.map((s) => JSON.parse(s).level);
|
||||
expect(levels).toEqual(["debug", "info", "warn", "error"]);
|
||||
});
|
||||
|
||||
test("no plain-text escape hatches — output is always JSON", () => {
|
||||
log.info("line 1");
|
||||
log.error("line 2", { code: "X" });
|
||||
log.debug("line 3");
|
||||
for (const line of captured) {
|
||||
expect(line.trim()).toMatch(/^\{.*\}$/);
|
||||
expect(() => JSON.parse(line)).not.toThrow();
|
||||
}
|
||||
});
|
||||
});
|
||||
80
apps/broker/tests/metrics.test.ts
Normal file
80
apps/broker/tests/metrics.test.ts
Normal file
@@ -0,0 +1,80 @@
|
||||
/**
|
||||
* Metrics output + counter/gauge behavior tests.
|
||||
*
|
||||
* Pure in-process — no DB, no network. Asserts Prometheus text
|
||||
* format and counter/gauge increment semantics.
|
||||
*/
|
||||
|
||||
import { beforeEach, describe, expect, test } from "vitest";
|
||||
import { metrics, metricsToText } from "../src/metrics";
|
||||
|
||||
describe("metrics registry", () => {
|
||||
test("every expected series is present in /metrics text", () => {
|
||||
const text = metricsToText();
|
||||
const expected = [
|
||||
"broker_connections_total",
|
||||
"broker_connections_rejected_total",
|
||||
"broker_connections_active",
|
||||
"broker_messages_routed_total",
|
||||
"broker_messages_rejected_total",
|
||||
"broker_queue_depth",
|
||||
"broker_ttl_sweeps_total",
|
||||
"broker_hook_requests_total",
|
||||
"broker_hook_requests_rate_limited_total",
|
||||
"broker_db_healthy",
|
||||
];
|
||||
for (const name of expected) {
|
||||
expect(text).toContain(`# HELP ${name}`);
|
||||
expect(text).toContain(`# TYPE ${name}`);
|
||||
}
|
||||
});
|
||||
|
||||
test("counter increments and appears in output", () => {
|
||||
const before = metrics.connectionsTotal.toText();
|
||||
const beforeVal = parseInt(
|
||||
before.split("\n").find((l) => l.startsWith("broker_connections_total "))
|
||||
?.split(" ")[1] ?? "0",
|
||||
10,
|
||||
);
|
||||
metrics.connectionsTotal.inc();
|
||||
metrics.connectionsTotal.inc();
|
||||
const after = metrics.connectionsTotal.toText();
|
||||
const afterVal = parseInt(
|
||||
after.split("\n").find((l) => l.startsWith("broker_connections_total "))
|
||||
?.split(" ")[1] ?? "0",
|
||||
10,
|
||||
);
|
||||
expect(afterVal - beforeVal).toBeGreaterThanOrEqual(2);
|
||||
});
|
||||
|
||||
test("counter labels produce separate series lines", () => {
|
||||
metrics.messagesRoutedTotal.inc({ priority: "now" });
|
||||
metrics.messagesRoutedTotal.inc({ priority: "now" });
|
||||
metrics.messagesRoutedTotal.inc({ priority: "next" });
|
||||
const text = metrics.messagesRoutedTotal.toText();
|
||||
expect(text).toMatch(/broker_messages_routed_total\{priority="now"\}/);
|
||||
expect(text).toMatch(/broker_messages_routed_total\{priority="next"\}/);
|
||||
});
|
||||
|
||||
test("gauge set overwrites prior value", () => {
|
||||
metrics.connectionsActive.set(5);
|
||||
let text = metrics.connectionsActive.toText();
|
||||
expect(text).toMatch(/broker_connections_active 5/);
|
||||
metrics.connectionsActive.set(2);
|
||||
text = metrics.connectionsActive.toText();
|
||||
expect(text).toMatch(/broker_connections_active 2/);
|
||||
expect(text).not.toMatch(/broker_connections_active 5/);
|
||||
});
|
||||
|
||||
test("prometheus format is well-formed (HELP + TYPE before samples)", () => {
|
||||
const text = metrics.queueDepth.toText();
|
||||
const lines = text.split("\n");
|
||||
expect(lines[0]).toMatch(/^# HELP broker_queue_depth /);
|
||||
expect(lines[1]).toMatch(/^# TYPE broker_queue_depth gauge$/);
|
||||
// Every non-comment line should be well-formed.
|
||||
for (const line of lines.slice(2)) {
|
||||
if (line.trim() === "") continue;
|
||||
expect(line).toMatch(/^broker_queue_depth(\{[^}]*\})? -?\d+(\.\d+)?$/);
|
||||
}
|
||||
});
|
||||
});
|
||||
76
apps/broker/tests/rate-limit.test.ts
Normal file
76
apps/broker/tests/rate-limit.test.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
/**
|
||||
* TokenBucket tests — pure unit tests, no I/O.
|
||||
*
|
||||
* Verifies the rate limiter applied to POST /hook/set-status.
|
||||
* Uses injected `now` timestamps to avoid sleeps.
|
||||
*/
|
||||
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { TokenBucket } from "../src/rate-limit";
|
||||
|
||||
describe("TokenBucket", () => {
|
||||
test("allows up to `capacity` requests in a burst", () => {
|
||||
const b = new TokenBucket(5, 60); // 5 capacity, 60/min refill
|
||||
const t0 = 1_000_000;
|
||||
for (let i = 0; i < 5; i++) {
|
||||
expect(b.take("key", t0)).toBe(true);
|
||||
}
|
||||
expect(b.take("key", t0)).toBe(false);
|
||||
});
|
||||
|
||||
test("30/min means 31st in first minute is rejected", () => {
|
||||
const b = new TokenBucket(30, 30);
|
||||
const t0 = 1_000_000;
|
||||
// Burst: drain the bucket at t0.
|
||||
for (let i = 0; i < 30; i++) expect(b.take("p:cwd", t0)).toBe(true);
|
||||
expect(b.take("p:cwd", t0)).toBe(false);
|
||||
});
|
||||
|
||||
test("refills over time", () => {
|
||||
const b = new TokenBucket(5, 60); // refill rate = 60/min = 1/sec
|
||||
const t0 = 1_000_000;
|
||||
// Drain
|
||||
for (let i = 0; i < 5; i++) b.take("k", t0);
|
||||
expect(b.take("k", t0)).toBe(false);
|
||||
// +1 second = +1 token
|
||||
expect(b.take("k", t0 + 1000)).toBe(true);
|
||||
expect(b.take("k", t0 + 1000)).toBe(false);
|
||||
// +2 more seconds = +2 tokens
|
||||
expect(b.take("k", t0 + 3000)).toBe(true);
|
||||
expect(b.take("k", t0 + 3000)).toBe(true);
|
||||
});
|
||||
|
||||
test("does not refill beyond capacity", () => {
|
||||
const b = new TokenBucket(5, 60);
|
||||
const t0 = 1_000_000;
|
||||
b.take("k", t0); // 4 remaining
|
||||
// Jump forward way past full refill
|
||||
const far = t0 + 60 * 60 * 1000; // +1 hour
|
||||
// Should allow only `capacity` consecutive takes, not more
|
||||
for (let i = 0; i < 5; i++) expect(b.take("k", far)).toBe(true);
|
||||
expect(b.take("k", far)).toBe(false);
|
||||
});
|
||||
|
||||
test("different keys have independent buckets", () => {
|
||||
const b = new TokenBucket(2, 60);
|
||||
const t0 = 1_000_000;
|
||||
expect(b.take("a", t0)).toBe(true);
|
||||
expect(b.take("a", t0)).toBe(true);
|
||||
expect(b.take("a", t0)).toBe(false);
|
||||
// "b" is fresh.
|
||||
expect(b.take("b", t0)).toBe(true);
|
||||
expect(b.take("b", t0)).toBe(true);
|
||||
expect(b.take("b", t0)).toBe(false);
|
||||
});
|
||||
|
||||
test("sweep removes buckets older than threshold", () => {
|
||||
const b = new TokenBucket(5, 60);
|
||||
const t0 = 1_000_000;
|
||||
b.take("stale", t0);
|
||||
b.take("fresh", t0 + 100_000);
|
||||
expect(b.size).toBe(2);
|
||||
// Sweep anything untouched for >60s, as of t0 + 90s.
|
||||
b.sweep(60_000, t0 + 90_000);
|
||||
expect(b.size).toBe(1);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user