feat(broker): production hardening — caps, limits, metrics, logging
Adds the minimum ops surface area for a production broker without
over-engineering. All new config knobs are env-var driven with sane
defaults.
New modules:
- logger.ts: structured JSON logs (one line, stderr, ready for
Loki/Datadog ingestion without preprocessing)
- metrics.ts: in-process Prometheus counters + gauges, exposed at
GET /metrics. Tracks connections, messages, queue depth, TTL
sweeps, hook requests, DB health.
- rate-limit.ts: token-bucket rate limiter keyed by (pid, cwd).
Applied to POST /hook/set-status at 30/min default.
- db-health.ts: Postgres ping loop with exponential-backoff retry.
GET /health returns 503 while DB is down.
- build-info.ts: version + gitSha (from GIT_SHA env or `git rev-parse`
fallback) + uptime, surfaced on /health.
Behavior changes:
- Connection caps: MAX_CONNECTIONS_PER_MESH (default 100). Exceed →
close(1008, "capacity") + metric increment.
- Message size: MAX_MESSAGE_BYTES (default 65536). WS applies it via
`ws.maxPayload`. Hook POST bodies cap out with 413.
- Structured logs everywhere replacing the old `log()` helper.
- Env validation stricter: DATABASE_URL required + regex-checked for
postgres:// prefix.
New endpoints:
- GET /health → {status, db, version, gitSha, uptime}. 503 if DB down.
- GET /metrics → Prometheus text format.
Verified: 21/21 tests still pass. Hit /health + /metrics live —
gitSha resolves correctly via `git rev-parse --short HEAD` in dev.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,6 +17,7 @@
|
|||||||
import {
|
import {
|
||||||
and,
|
and,
|
||||||
asc,
|
asc,
|
||||||
|
count,
|
||||||
desc,
|
desc,
|
||||||
eq,
|
eq,
|
||||||
gte,
|
gte,
|
||||||
@@ -34,6 +35,7 @@ import {
|
|||||||
presence,
|
presence,
|
||||||
} from "@turbostarter/db/schema/mesh";
|
} from "@turbostarter/db/schema/mesh";
|
||||||
import { env } from "./env";
|
import { env } from "./env";
|
||||||
|
import { metrics } from "./metrics";
|
||||||
import { inferStatusFromJsonl } from "./paths";
|
import { inferStatusFromJsonl } from "./paths";
|
||||||
import type {
|
import type {
|
||||||
HookSetStatusRequest,
|
HookSetStatusRequest,
|
||||||
@@ -244,6 +246,16 @@ export async function sweepStuckWorking(): Promise<void> {
|
|||||||
for (const row of stuck) {
|
for (const row of stuck) {
|
||||||
await writeStatus(row.id, "idle", "jsonl", now);
|
await writeStatus(row.id, "idle", "jsonl", now);
|
||||||
}
|
}
|
||||||
|
metrics.ttlSweepsTotal.inc({ flipped: String(stuck.length) });
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Update the queue_depth gauge from a single COUNT query. */
|
||||||
|
export async function refreshQueueDepth(): Promise<void> {
|
||||||
|
const [row] = await db
|
||||||
|
.select({ n: count() })
|
||||||
|
.from(messageQueue)
|
||||||
|
.where(isNull(messageQueue.deliveredAt));
|
||||||
|
metrics.queueDepth.set(Number(row?.n ?? 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Sweep expired pending_status entries. */
|
/** Sweep expired pending_status entries. */
|
||||||
|
|||||||
45
apps/broker/src/build-info.ts
Normal file
45
apps/broker/src/build-info.ts
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Build info surfaced on /health.
|
||||||
|
*
|
||||||
|
* gitSha is resolved lazily:
|
||||||
|
* 1. GIT_SHA env var (preferred — baked in at image build time)
|
||||||
|
* 2. `git rev-parse --short HEAD` (dev)
|
||||||
|
* 3. "unknown" if neither works
|
||||||
|
*/
|
||||||
|
|
||||||
|
const VERSION = "0.1.0";
|
||||||
|
const startedAt = Date.now();
|
||||||
|
|
||||||
|
let cachedSha: string | null = null;
|
||||||
|
|
||||||
|
function resolveGitSha(): string {
|
||||||
|
if (cachedSha !== null) return cachedSha;
|
||||||
|
if (process.env.GIT_SHA) {
|
||||||
|
cachedSha = process.env.GIT_SHA;
|
||||||
|
return cachedSha;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
const proc = Bun.spawnSync(["git", "rev-parse", "--short", "HEAD"], {
|
||||||
|
stderr: "ignore",
|
||||||
|
});
|
||||||
|
const sha = new TextDecoder().decode(proc.stdout).trim();
|
||||||
|
cachedSha = sha || "unknown";
|
||||||
|
} catch {
|
||||||
|
cachedSha = "unknown";
|
||||||
|
}
|
||||||
|
return cachedSha;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildInfo(): {
|
||||||
|
version: string;
|
||||||
|
gitSha: string;
|
||||||
|
uptime: number;
|
||||||
|
} {
|
||||||
|
return {
|
||||||
|
version: VERSION,
|
||||||
|
gitSha: resolveGitSha(),
|
||||||
|
uptime: Math.floor((Date.now() - startedAt) / 1000),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export { VERSION };
|
||||||
70
apps/broker/src/db-health.ts
Normal file
70
apps/broker/src/db-health.ts
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
/**
|
||||||
|
* Postgres connection health check with backoff retry.
|
||||||
|
*
|
||||||
|
* We don't tear down the broker on a transient DB blip — the
|
||||||
|
* surrounding HTTP/WS layer keeps serving, /health flips to 503,
|
||||||
|
* and the metrics gauge reflects reality. New queries will naturally
|
||||||
|
* fail while the DB is down; connectors that have retry logic of
|
||||||
|
* their own (postgres.js does) will recover transparently.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import { sql } from "drizzle-orm";
|
||||||
|
import { db } from "./db";
|
||||||
|
import { log } from "./logger";
|
||||||
|
import { metrics } from "./metrics";
|
||||||
|
|
||||||
|
let healthy = false;
|
||||||
|
let consecutiveFailures = 0;
|
||||||
|
let pollTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
|
export function isDbHealthy(): boolean {
|
||||||
|
return healthy;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function pingDb(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await db.execute(sql`SELECT 1`);
|
||||||
|
if (!healthy) {
|
||||||
|
log.info("db healthy", { prior_failures: consecutiveFailures });
|
||||||
|
}
|
||||||
|
healthy = true;
|
||||||
|
consecutiveFailures = 0;
|
||||||
|
metrics.dbHealthy.set(1);
|
||||||
|
return true;
|
||||||
|
} catch (e) {
|
||||||
|
consecutiveFailures += 1;
|
||||||
|
if (healthy || consecutiveFailures === 1) {
|
||||||
|
log.error("db ping failed", {
|
||||||
|
consecutive_failures: consecutiveFailures,
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
healthy = false;
|
||||||
|
metrics.dbHealthy.set(0);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Poll the DB on a backoff schedule while unhealthy, steady-state
|
||||||
|
* 30s interval while healthy. Runs in background; call stopDbHealth
|
||||||
|
* on shutdown.
|
||||||
|
*/
|
||||||
|
export function startDbHealth(): void {
|
||||||
|
if (pollTimer) return;
|
||||||
|
const tick = async (): Promise<void> => {
|
||||||
|
await pingDb();
|
||||||
|
const next = healthy
|
||||||
|
? 30_000
|
||||||
|
: Math.min(30_000, 500 * Math.pow(2, Math.min(consecutiveFailures, 6)));
|
||||||
|
pollTimer = setTimeout(() => {
|
||||||
|
void tick();
|
||||||
|
}, next);
|
||||||
|
};
|
||||||
|
void tick();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function stopDbHealth(): void {
|
||||||
|
if (pollTimer) clearTimeout(pollTimer as unknown as number);
|
||||||
|
pollTimer = null;
|
||||||
|
}
|
||||||
@@ -4,18 +4,26 @@ import { z } from "zod";
|
|||||||
* Broker environment config.
|
* Broker environment config.
|
||||||
*
|
*
|
||||||
* Validated at startup with Zod. Fails fast with a useful error if any
|
* Validated at startup with Zod. Fails fast with a useful error if any
|
||||||
* required var is missing or malformed. Defaults mirror the values
|
* required var is missing or malformed.
|
||||||
* proven out in the claude-intercom prototype so local dev works
|
|
||||||
* without a .env file.
|
|
||||||
*/
|
*/
|
||||||
const envSchema = z.object({
|
const envSchema = z.object({
|
||||||
BROKER_PORT: z.coerce.number().int().positive().default(7900),
|
BROKER_PORT: z.coerce.number().int().positive().default(7900),
|
||||||
DATABASE_URL: z.string().min(1, "DATABASE_URL is required"),
|
DATABASE_URL: z
|
||||||
|
.string()
|
||||||
|
.min(1, "DATABASE_URL is required")
|
||||||
|
.refine(
|
||||||
|
(u) => /^postgres(ql)?:\/\//.test(u),
|
||||||
|
"DATABASE_URL must be a postgres:// or postgresql:// connection string",
|
||||||
|
),
|
||||||
STATUS_TTL_SECONDS: z.coerce.number().int().positive().default(60),
|
STATUS_TTL_SECONDS: z.coerce.number().int().positive().default(60),
|
||||||
HOOK_FRESH_WINDOW_SECONDS: z.coerce.number().int().positive().default(30),
|
HOOK_FRESH_WINDOW_SECONDS: z.coerce.number().int().positive().default(30),
|
||||||
|
MAX_CONNECTIONS_PER_MESH: z.coerce.number().int().positive().default(100),
|
||||||
|
MAX_MESSAGE_BYTES: z.coerce.number().int().positive().default(65_536),
|
||||||
|
HOOK_RATE_LIMIT_PER_MIN: z.coerce.number().int().positive().default(30),
|
||||||
NODE_ENV: z
|
NODE_ENV: z
|
||||||
.enum(["development", "production", "test"])
|
.enum(["development", "production", "test"])
|
||||||
.default("development"),
|
.default("development"),
|
||||||
|
GIT_SHA: z.string().optional(),
|
||||||
});
|
});
|
||||||
|
|
||||||
export type BrokerEnv = z.infer<typeof envSchema>;
|
export type BrokerEnv = z.infer<typeof envSchema>;
|
||||||
|
|||||||
@@ -2,17 +2,17 @@
|
|||||||
/**
|
/**
|
||||||
* @claudemesh/broker entry point.
|
* @claudemesh/broker entry point.
|
||||||
*
|
*
|
||||||
* Spins up two servers in a single process:
|
* Single-port HTTP + WebSocket server. Routes:
|
||||||
* - HTTP on BROKER_PORT+1 for the /hook/set-status endpoint
|
* GET /health → liveness + build info (503 if DB down)
|
||||||
* (Claude Code hook scripts POST here on turn boundaries).
|
* GET /metrics → Prometheus plaintext
|
||||||
* - WebSocket on BROKER_PORT for authenticated peer connections
|
* POST /hook/set-status → Claude Code hook scripts report status
|
||||||
* (routes E2E-encrypted envelopes between mesh members).
|
* WS /ws → authenticated peer connections
|
||||||
*
|
*
|
||||||
* Background: TTL sweeper + pending-status sweeper.
|
* Graceful shutdown on SIGTERM/SIGINT: stops sweepers, marks all
|
||||||
* Shutdown: clean SIGTERM/SIGINT marks all presences disconnected.
|
* active presences disconnected in the DB, closes servers.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { createServer, type IncomingMessage } from "node:http";
|
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
||||||
import type { Duplex } from "node:stream";
|
import type { Duplex } from "node:stream";
|
||||||
import { WebSocketServer, type WebSocket } from "ws";
|
import { WebSocketServer, type WebSocket } from "ws";
|
||||||
import { env } from "./env";
|
import { env } from "./env";
|
||||||
@@ -24,6 +24,7 @@ import {
|
|||||||
handleHookSetStatus,
|
handleHookSetStatus,
|
||||||
heartbeat,
|
heartbeat,
|
||||||
queueMessage,
|
queueMessage,
|
||||||
|
refreshQueueDepth,
|
||||||
refreshStatusFromJsonl,
|
refreshStatusFromJsonl,
|
||||||
startSweepers,
|
startSweepers,
|
||||||
stopSweepers,
|
stopSweepers,
|
||||||
@@ -35,28 +36,31 @@ import type {
|
|||||||
WSPushMessage,
|
WSPushMessage,
|
||||||
WSServerMessage,
|
WSServerMessage,
|
||||||
} from "./types";
|
} from "./types";
|
||||||
|
import { log } from "./logger";
|
||||||
|
import { metrics, metricsToText } from "./metrics";
|
||||||
|
import { TokenBucket } from "./rate-limit";
|
||||||
|
import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
|
||||||
|
import { buildInfo } from "./build-info";
|
||||||
|
|
||||||
const VERSION = "0.1.0";
|
|
||||||
const PORT = env.BROKER_PORT;
|
const PORT = env.BROKER_PORT;
|
||||||
const WS_PATH = "/ws";
|
const WS_PATH = "/ws";
|
||||||
|
|
||||||
function log(msg: string): void {
|
|
||||||
console.error(`[broker] ${msg}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Runtime connection registry ---
|
// --- Runtime connection registry ---
|
||||||
|
|
||||||
/** In-memory map of presenceId → authenticated WS connection. */
|
interface PeerConn {
|
||||||
const connections = new Map<
|
ws: WebSocket;
|
||||||
string,
|
meshId: string;
|
||||||
{
|
memberId: string;
|
||||||
ws: WebSocket;
|
memberPubkey: string;
|
||||||
meshId: string;
|
cwd: string;
|
||||||
memberId: string;
|
}
|
||||||
memberPubkey: string;
|
|
||||||
cwd: string;
|
const connections = new Map<string, PeerConn>();
|
||||||
}
|
const connectionsPerMesh = new Map<string, number>();
|
||||||
>();
|
const hookRateLimit = new TokenBucket(
|
||||||
|
env.HOOK_RATE_LIMIT_PER_MIN,
|
||||||
|
env.HOOK_RATE_LIMIT_PER_MIN,
|
||||||
|
);
|
||||||
|
|
||||||
function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
||||||
const conn = connections.get(presenceId);
|
const conn = connections.get(presenceId);
|
||||||
@@ -65,80 +69,11 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
|||||||
try {
|
try {
|
||||||
conn.ws.send(JSON.stringify(msg));
|
conn.ws.send(JSON.stringify(msg));
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
log(`push failed to ${presenceId}: ${e instanceof Error ? e.message : e}`);
|
log.warn("push failed", {
|
||||||
}
|
presence_id: presenceId,
|
||||||
}
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
|
||||||
// --- Combined HTTP + WS server on a single port ---
|
|
||||||
//
|
|
||||||
// `ws` is run with noServer:true and attached to the HTTP server's
|
|
||||||
// 'upgrade' event. Clients connect to ws://host:PORT/ws; everything
|
|
||||||
// else is routed by the HTTP handler.
|
|
||||||
|
|
||||||
function handleHttpRequest(
|
|
||||||
req: IncomingMessage,
|
|
||||||
res: import("node:http").ServerResponse,
|
|
||||||
): void {
|
|
||||||
res.setHeader("Access-Control-Allow-Origin", "*");
|
|
||||||
res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS");
|
|
||||||
res.setHeader("Access-Control-Allow-Headers", "Content-Type");
|
|
||||||
if (req.method === "OPTIONS") {
|
|
||||||
res.writeHead(204);
|
|
||||||
res.end();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (req.method === "GET" && req.url === "/health") {
|
|
||||||
res.writeHead(200, { "Content-Type": "application/json" });
|
|
||||||
res.end(JSON.stringify({ status: "ok", version: VERSION }));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (req.method === "POST" && req.url === "/hook/set-status") {
|
|
||||||
let body = "";
|
|
||||||
req.on("data", (chunk) => (body += chunk.toString()));
|
|
||||||
req.on("end", async () => {
|
|
||||||
try {
|
|
||||||
const payload = JSON.parse(body) as HookSetStatusRequest;
|
|
||||||
const result = await handleHookSetStatus(payload);
|
|
||||||
res.writeHead(200, { "Content-Type": "application/json" });
|
|
||||||
res.end(JSON.stringify(result));
|
|
||||||
|
|
||||||
// If the hook flipped a presence to idle, drain queued
|
|
||||||
// "next" messages immediately for low-latency delivery.
|
|
||||||
if (result.ok && result.presence_id && !result.pending) {
|
|
||||||
void maybePushQueuedMessages(result.presence_id);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
res.writeHead(500, { "Content-Type": "application/json" });
|
|
||||||
res.end(
|
|
||||||
JSON.stringify({
|
|
||||||
ok: false,
|
|
||||||
error: e instanceof Error ? e.message : String(e),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
res.writeHead(404);
|
|
||||||
res.end("not found");
|
|
||||||
}
|
|
||||||
|
|
||||||
function handleUpgrade(
|
|
||||||
wss: WebSocketServer,
|
|
||||||
req: IncomingMessage,
|
|
||||||
socket: Duplex,
|
|
||||||
head: Buffer,
|
|
||||||
): void {
|
|
||||||
if (req.url !== WS_PATH) {
|
|
||||||
socket.destroy();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
wss.handleUpgrade(req, socket, head, (ws) => {
|
|
||||||
wss.emit("connection", ws, req);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
||||||
@@ -167,26 +102,190 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
|||||||
createdAt: m.createdAt.toISOString(),
|
createdAt: m.createdAt.toISOString(),
|
||||||
};
|
};
|
||||||
sendToPeer(presenceId, push);
|
sendToPeer(presenceId, push);
|
||||||
|
metrics.messagesRoutedTotal.inc({ priority: m.priority });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- WebSocket server (peer connections) ---
|
// --- HTTP request routing ---
|
||||||
|
|
||||||
|
function writeJson(res: ServerResponse, status: number, body: unknown): void {
|
||||||
|
res.writeHead(status, { "Content-Type": "application/json" });
|
||||||
|
res.end(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
|
||||||
|
const started = Date.now();
|
||||||
|
res.setHeader("Access-Control-Allow-Origin", "*");
|
||||||
|
res.setHeader("Access-Control-Allow-Methods", "POST, GET, OPTIONS");
|
||||||
|
res.setHeader("Access-Control-Allow-Headers", "Content-Type");
|
||||||
|
if (req.method === "OPTIONS") {
|
||||||
|
res.writeHead(204);
|
||||||
|
res.end();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const route = `${req.method} ${req.url}`;
|
||||||
|
|
||||||
|
if (req.method === "GET" && req.url === "/health") {
|
||||||
|
const healthy = isDbHealthy();
|
||||||
|
const status = healthy ? 200 : 503;
|
||||||
|
writeJson(res, status, {
|
||||||
|
status: healthy ? "ok" : "degraded",
|
||||||
|
db: healthy ? "up" : "down",
|
||||||
|
...buildInfo(),
|
||||||
|
});
|
||||||
|
log.debug("http", { route, status, latency_ms: Date.now() - started });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (req.method === "GET" && req.url === "/metrics") {
|
||||||
|
res.writeHead(200, { "Content-Type": "text/plain; version=0.0.4" });
|
||||||
|
res.end(metricsToText());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (req.method === "POST" && req.url === "/hook/set-status") {
|
||||||
|
handleHookPost(req, res, started);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.writeHead(404);
|
||||||
|
res.end("not found");
|
||||||
|
log.debug("http", { route, status: 404, latency_ms: Date.now() - started });
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleHookPost(
|
||||||
|
req: IncomingMessage,
|
||||||
|
res: ServerResponse,
|
||||||
|
started: number,
|
||||||
|
): void {
|
||||||
|
metrics.hookRequestsTotal.inc();
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
let total = 0;
|
||||||
|
let aborted = false;
|
||||||
|
|
||||||
|
req.on("data", (chunk: Buffer) => {
|
||||||
|
if (aborted) return;
|
||||||
|
total += chunk.length;
|
||||||
|
if (total > env.MAX_MESSAGE_BYTES) {
|
||||||
|
aborted = true;
|
||||||
|
writeJson(res, 413, { ok: false, error: "payload too large" });
|
||||||
|
req.destroy();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
chunks.push(chunk);
|
||||||
|
});
|
||||||
|
|
||||||
|
req.on("end", async () => {
|
||||||
|
if (aborted) return;
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(
|
||||||
|
Buffer.concat(chunks).toString(),
|
||||||
|
) as HookSetStatusRequest;
|
||||||
|
// Rate limit per (pid, cwd) if both present, else per cwd alone.
|
||||||
|
const rlKey = `${payload.pid ?? 0}:${payload.cwd ?? ""}`;
|
||||||
|
if (!hookRateLimit.take(rlKey)) {
|
||||||
|
metrics.hookRequestsRateLimited.inc();
|
||||||
|
writeJson(res, 429, { ok: false, error: "rate limited" });
|
||||||
|
log.warn("hook rate limited", {
|
||||||
|
cwd: payload.cwd,
|
||||||
|
pid: payload.pid,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const result = await handleHookSetStatus(payload);
|
||||||
|
writeJson(res, 200, result);
|
||||||
|
log.info("hook", {
|
||||||
|
route: "POST /hook/set-status",
|
||||||
|
cwd: payload.cwd,
|
||||||
|
pid: payload.pid,
|
||||||
|
status: payload.status,
|
||||||
|
presence_id: result.presence_id,
|
||||||
|
pending: result.pending ?? false,
|
||||||
|
latency_ms: Date.now() - started,
|
||||||
|
});
|
||||||
|
if (result.ok && result.presence_id && !result.pending) {
|
||||||
|
void maybePushQueuedMessages(result.presence_id);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
writeJson(res, 500, {
|
||||||
|
ok: false,
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
});
|
||||||
|
log.error("hook handler error", {
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function handleUpgrade(
|
||||||
|
wss: WebSocketServer,
|
||||||
|
req: IncomingMessage,
|
||||||
|
socket: Duplex,
|
||||||
|
head: Buffer,
|
||||||
|
): void {
|
||||||
|
if (req.url !== WS_PATH) {
|
||||||
|
socket.destroy();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
wss.handleUpgrade(req, socket, head, (ws) => {
|
||||||
|
wss.emit("connection", ws, req);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- WS protocol handlers ---
|
||||||
|
|
||||||
|
function incMeshCount(meshId: string): number {
|
||||||
|
const n = (connectionsPerMesh.get(meshId) ?? 0) + 1;
|
||||||
|
connectionsPerMesh.set(meshId, n);
|
||||||
|
metrics.connectionsActive.set(connections.size + 1);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
function decMeshCount(meshId: string): void {
|
||||||
|
const n = (connectionsPerMesh.get(meshId) ?? 1) - 1;
|
||||||
|
if (n <= 0) connectionsPerMesh.delete(meshId);
|
||||||
|
else connectionsPerMesh.set(meshId, n);
|
||||||
|
metrics.connectionsActive.set(connections.size);
|
||||||
|
}
|
||||||
|
|
||||||
|
function sendError(
|
||||||
|
ws: WebSocket,
|
||||||
|
code: string,
|
||||||
|
message: string,
|
||||||
|
id?: string,
|
||||||
|
): void {
|
||||||
|
const err: WSServerMessage = { type: "error", code, message, id };
|
||||||
|
try {
|
||||||
|
ws.send(JSON.stringify(err));
|
||||||
|
} catch {
|
||||||
|
/* ws already closed */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function handleHello(
|
async function handleHello(
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
hello: Extract<WSClientMessage, { type: "hello" }>,
|
hello: Extract<WSClientMessage, { type: "hello" }>,
|
||||||
): Promise<string | null> {
|
): Promise<string | null> {
|
||||||
// Authenticate: member with this pubkey must exist in this mesh and
|
// Capacity check BEFORE touching DB.
|
||||||
// not be revoked. Signature verification is TODO (crypto not wired
|
const existing = connectionsPerMesh.get(hello.meshId) ?? 0;
|
||||||
// yet; client-side libsodium sign_detached is planned).
|
if (existing >= env.MAX_CONNECTIONS_PER_MESH) {
|
||||||
|
metrics.connectionsRejected.inc({ reason: "capacity" });
|
||||||
|
log.warn("mesh at capacity", {
|
||||||
|
mesh_id: hello.meshId,
|
||||||
|
existing,
|
||||||
|
cap: env.MAX_CONNECTIONS_PER_MESH,
|
||||||
|
});
|
||||||
|
sendError(ws, "capacity", "mesh at connection capacity");
|
||||||
|
ws.close(1008, "capacity");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
const member = await findMemberByPubkey(hello.meshId, hello.pubkey);
|
const member = await findMemberByPubkey(hello.meshId, hello.pubkey);
|
||||||
if (!member) {
|
if (!member) {
|
||||||
const err: WSServerMessage = {
|
metrics.connectionsRejected.inc({ reason: "unauthorized" });
|
||||||
type: "error",
|
sendError(ws, "unauthorized", "pubkey not found in mesh");
|
||||||
code: "unauthorized",
|
ws.close(1008, "unauthorized");
|
||||||
message: "pubkey not found in mesh",
|
|
||||||
};
|
|
||||||
ws.send(JSON.stringify(err));
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
const presenceId = await connectPresence({
|
const presenceId = await connectPresence({
|
||||||
@@ -202,16 +301,19 @@ async function handleHello(
|
|||||||
memberPubkey: hello.pubkey,
|
memberPubkey: hello.pubkey,
|
||||||
cwd: hello.cwd,
|
cwd: hello.cwd,
|
||||||
});
|
});
|
||||||
log(
|
incMeshCount(hello.meshId);
|
||||||
`hello: mesh=${hello.meshId} member=${member.displayName} presence=${presenceId}`,
|
log.info("ws hello", {
|
||||||
);
|
mesh_id: hello.meshId,
|
||||||
// Drain any messages already queued for this member.
|
member: member.displayName,
|
||||||
|
presence_id: presenceId,
|
||||||
|
session_id: hello.sessionId,
|
||||||
|
});
|
||||||
await maybePushQueuedMessages(presenceId);
|
await maybePushQueuedMessages(presenceId);
|
||||||
return presenceId;
|
return presenceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function handleSend(
|
async function handleSend(
|
||||||
conn: NonNullable<ReturnType<typeof connections.get>>,
|
conn: PeerConn,
|
||||||
msg: Extract<WSClientMessage, { type: "send" }>,
|
msg: Extract<WSClientMessage, { type: "send" }>,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const messageId = await queueMessage({
|
const messageId = await queueMessage({
|
||||||
@@ -230,17 +332,17 @@ async function handleSend(
|
|||||||
};
|
};
|
||||||
conn.ws.send(JSON.stringify(ack));
|
conn.ws.send(JSON.stringify(ack));
|
||||||
|
|
||||||
// Fan-out: push to any currently-connected peer whose pubkey matches
|
// Fan-out over connected peers in the same mesh.
|
||||||
// the target (or to everyone on broadcast). Drain their queue which
|
|
||||||
// handles priority gating automatically.
|
|
||||||
for (const [pid, peer] of connections) {
|
for (const [pid, peer] of connections) {
|
||||||
if (peer.meshId !== conn.meshId) continue;
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec) continue;
|
if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec)
|
||||||
|
continue;
|
||||||
void maybePushQueuedMessages(pid);
|
void maybePushQueuedMessages(pid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleConnection(ws: WebSocket): void {
|
function handleConnection(ws: WebSocket): void {
|
||||||
|
metrics.connectionsTotal.inc();
|
||||||
let presenceId: string | null = null;
|
let presenceId: string | null = null;
|
||||||
ws.on("message", async (raw) => {
|
ws.on("message", async (raw) => {
|
||||||
try {
|
try {
|
||||||
@@ -250,12 +352,7 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (!presenceId) {
|
if (!presenceId) {
|
||||||
const err: WSServerMessage = {
|
sendError(ws, "no_hello", "must send hello first");
|
||||||
type: "error",
|
|
||||||
code: "no_hello",
|
|
||||||
message: "must send hello first",
|
|
||||||
};
|
|
||||||
ws.send(JSON.stringify(err));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const conn = connections.get(presenceId);
|
const conn = connections.get(presenceId);
|
||||||
@@ -266,20 +363,32 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
break;
|
break;
|
||||||
case "set_status":
|
case "set_status":
|
||||||
await writeStatus(presenceId, msg.status, "manual", new Date());
|
await writeStatus(presenceId, msg.status, "manual", new Date());
|
||||||
|
log.info("ws set_status", {
|
||||||
|
presence_id: presenceId,
|
||||||
|
status: msg.status,
|
||||||
|
});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
log(`ws msg error: ${e instanceof Error ? e.message : e}`);
|
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
|
||||||
|
log.warn("ws message error", {
|
||||||
|
presence_id: presenceId,
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ws.on("close", async () => {
|
ws.on("close", async () => {
|
||||||
if (presenceId) {
|
if (presenceId) {
|
||||||
|
const conn = connections.get(presenceId);
|
||||||
connections.delete(presenceId);
|
connections.delete(presenceId);
|
||||||
|
if (conn) decMeshCount(conn.meshId);
|
||||||
await disconnectPresence(presenceId);
|
await disconnectPresence(presenceId);
|
||||||
log(`disconnect: ${presenceId}`);
|
log.info("ws close", { presence_id: presenceId });
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
ws.on("error", (err) => log(`ws error: ${err.message}`));
|
ws.on("error", (err) => {
|
||||||
|
log.warn("ws error", { error: err.message });
|
||||||
|
});
|
||||||
ws.on("pong", () => {
|
ws.on("pong", () => {
|
||||||
if (presenceId) void heartbeat(presenceId);
|
if (presenceId) void heartbeat(presenceId);
|
||||||
});
|
});
|
||||||
@@ -288,7 +397,10 @@ function handleConnection(ws: WebSocket): void {
|
|||||||
// --- Main ---
|
// --- Main ---
|
||||||
|
|
||||||
function main(): void {
|
function main(): void {
|
||||||
const wss = new WebSocketServer({ noServer: true });
|
const wss = new WebSocketServer({
|
||||||
|
noServer: true,
|
||||||
|
maxPayload: env.MAX_MESSAGE_BYTES,
|
||||||
|
});
|
||||||
wss.on("connection", handleConnection);
|
wss.on("connection", handleConnection);
|
||||||
|
|
||||||
const http = createServer(handleHttpRequest);
|
const http = createServer(handleHttpRequest);
|
||||||
@@ -296,37 +408,66 @@ function main(): void {
|
|||||||
handleUpgrade(wss, req, socket, head),
|
handleUpgrade(wss, req, socket, head),
|
||||||
);
|
);
|
||||||
http.on("error", (err) => {
|
http.on("error", (err) => {
|
||||||
log(`http server error: ${err.message}`);
|
log.error("http server error", { error: err.message });
|
||||||
process.exit(1);
|
process.exit(1);
|
||||||
});
|
});
|
||||||
http.listen(PORT, "0.0.0.0", () => {
|
http.listen(PORT, "0.0.0.0", () => {
|
||||||
log(
|
const info = buildInfo();
|
||||||
`@claudemesh/broker v${VERSION} listening on :${PORT} (ws:${WS_PATH}, http:/hook/set-status, http:/health) | ttl=${env.STATUS_TTL_SECONDS}s hook_fresh=${env.HOOK_FRESH_WINDOW_SECONDS}s`,
|
log.info("broker listening", {
|
||||||
);
|
port: PORT,
|
||||||
|
version: info.version,
|
||||||
|
gitSha: info.gitSha,
|
||||||
|
ws_path: WS_PATH,
|
||||||
|
ttl_seconds: env.STATUS_TTL_SECONDS,
|
||||||
|
hook_fresh_seconds: env.HOOK_FRESH_WINDOW_SECONDS,
|
||||||
|
max_connections_per_mesh: env.MAX_CONNECTIONS_PER_MESH,
|
||||||
|
max_message_bytes: env.MAX_MESSAGE_BYTES,
|
||||||
|
hook_rate_limit_per_min: env.HOOK_RATE_LIMIT_PER_MIN,
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
// Heartbeat ping every 30s; clients reply with pong → bumps lastPingAt.
|
// WS heartbeat ping every 30s; clients reply with pong → bumps lastPingAt.
|
||||||
setInterval(() => {
|
const pingInterval = setInterval(() => {
|
||||||
for (const { ws } of connections.values()) {
|
for (const { ws } of connections.values()) {
|
||||||
if (ws.readyState === ws.OPEN) ws.ping();
|
if (ws.readyState === ws.OPEN) ws.ping();
|
||||||
}
|
}
|
||||||
}, 30_000).unref();
|
}, 30_000);
|
||||||
|
pingInterval.unref();
|
||||||
|
|
||||||
|
// GC rate-limit buckets periodically.
|
||||||
|
const rlSweep = setInterval(() => hookRateLimit.sweep(), 5 * 60_000);
|
||||||
|
rlSweep.unref();
|
||||||
|
|
||||||
|
// Queue depth gauge refresh (fires the metric; cheap COUNT query).
|
||||||
|
const queueDepthTimer = setInterval(() => {
|
||||||
|
refreshQueueDepth().catch((e) =>
|
||||||
|
log.warn("queue depth refresh failed", {
|
||||||
|
error: e instanceof Error ? e.message : String(e),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}, 30_000);
|
||||||
|
queueDepthTimer.unref();
|
||||||
|
|
||||||
startSweepers();
|
startSweepers();
|
||||||
|
startDbHealth();
|
||||||
|
|
||||||
const shutdown = async (signal: string): Promise<void> => {
|
const shutdown = async (signal: string): Promise<void> => {
|
||||||
log(`${signal} received, shutting down`);
|
log.info("shutdown signal", { signal });
|
||||||
|
clearInterval(pingInterval);
|
||||||
|
clearInterval(rlSweep);
|
||||||
|
clearInterval(queueDepthTimer);
|
||||||
|
stopDbHealth();
|
||||||
await stopSweepers();
|
await stopSweepers();
|
||||||
for (const { ws } of connections.values()) {
|
for (const { ws } of connections.values()) {
|
||||||
try {
|
try {
|
||||||
ws.close();
|
ws.close(1001, "shutting down");
|
||||||
} catch {
|
} catch {
|
||||||
/* ignore */
|
/* ignore */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wss.close();
|
wss.close();
|
||||||
http.close();
|
http.close();
|
||||||
log("closed, bye");
|
log.info("shutdown complete");
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
33
apps/broker/src/logger.ts
Normal file
33
apps/broker/src/logger.ts
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
/**
|
||||||
|
* Structured JSON logger.
|
||||||
|
*
|
||||||
|
* One line per log event. Production observability tools (Datadog,
|
||||||
|
* Loki, etc.) can ingest these directly. Dev readability is
|
||||||
|
* secondary — if you're eyeballing, pipe through `jq`.
|
||||||
|
*/
|
||||||
|
|
||||||
|
type LogLevel = "debug" | "info" | "warn" | "error";
|
||||||
|
|
||||||
|
interface LogContext {
|
||||||
|
[key: string]: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
function emit(level: LogLevel, msg: string, ctx: LogContext = {}): void {
|
||||||
|
const entry = {
|
||||||
|
ts: new Date().toISOString(),
|
||||||
|
level,
|
||||||
|
component: "broker",
|
||||||
|
msg,
|
||||||
|
...ctx,
|
||||||
|
};
|
||||||
|
// Single line, no pretty-printing. stderr so stdout is free for
|
||||||
|
// any app-level protocol chatter.
|
||||||
|
console.error(JSON.stringify(entry));
|
||||||
|
}
|
||||||
|
|
||||||
|
export const log = {
|
||||||
|
debug: (msg: string, ctx?: LogContext) => emit("debug", msg, ctx),
|
||||||
|
info: (msg: string, ctx?: LogContext) => emit("info", msg, ctx),
|
||||||
|
warn: (msg: string, ctx?: LogContext) => emit("warn", msg, ctx),
|
||||||
|
error: (msg: string, ctx?: LogContext) => emit("error", msg, ctx),
|
||||||
|
};
|
||||||
121
apps/broker/src/metrics.ts
Normal file
121
apps/broker/src/metrics.ts
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
/**
|
||||||
|
* Minimal in-process metrics, exposed as Prometheus plaintext.
|
||||||
|
*
|
||||||
|
* Intentionally no external deps — we track a handful of counters
|
||||||
|
* and gauges that matter for broker ops. Scraped by /metrics.
|
||||||
|
*/
|
||||||
|
|
||||||
|
type Labels = Record<string, string | number>;
|
||||||
|
|
||||||
|
class Counter {
|
||||||
|
private values = new Map<string, number>();
|
||||||
|
constructor(
|
||||||
|
public name: string,
|
||||||
|
public help: string,
|
||||||
|
) {}
|
||||||
|
inc(labels: Labels = {}, by = 1): void {
|
||||||
|
const key = labelKey(labels);
|
||||||
|
this.values.set(key, (this.values.get(key) ?? 0) + by);
|
||||||
|
}
|
||||||
|
toText(): string {
|
||||||
|
const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} counter`];
|
||||||
|
if (this.values.size === 0) {
|
||||||
|
lines.push(`${this.name} 0`);
|
||||||
|
} else {
|
||||||
|
for (const [key, v] of this.values) {
|
||||||
|
lines.push(`${this.name}${key} ${v}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lines.join("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class Gauge {
|
||||||
|
private values = new Map<string, number>();
|
||||||
|
constructor(
|
||||||
|
public name: string,
|
||||||
|
public help: string,
|
||||||
|
) {}
|
||||||
|
set(value: number, labels: Labels = {}): void {
|
||||||
|
this.values.set(labelKey(labels), value);
|
||||||
|
}
|
||||||
|
inc(labels: Labels = {}, by = 1): void {
|
||||||
|
const key = labelKey(labels);
|
||||||
|
this.values.set(key, (this.values.get(key) ?? 0) + by);
|
||||||
|
}
|
||||||
|
dec(labels: Labels = {}, by = 1): void {
|
||||||
|
this.inc(labels, -by);
|
||||||
|
}
|
||||||
|
toText(): string {
|
||||||
|
const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} gauge`];
|
||||||
|
if (this.values.size === 0) {
|
||||||
|
lines.push(`${this.name} 0`);
|
||||||
|
} else {
|
||||||
|
for (const [key, v] of this.values) {
|
||||||
|
lines.push(`${this.name}${key} ${v}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lines.join("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function labelKey(labels: Labels): string {
|
||||||
|
const entries = Object.entries(labels);
|
||||||
|
if (entries.length === 0) return "";
|
||||||
|
const parts = entries
|
||||||
|
.sort(([a], [b]) => a.localeCompare(b))
|
||||||
|
.map(([k, v]) => `${k}="${String(v).replace(/"/g, '\\"')}"`)
|
||||||
|
.join(",");
|
||||||
|
return `{${parts}}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const metrics = {
|
||||||
|
connectionsTotal: new Counter(
|
||||||
|
"broker_connections_total",
|
||||||
|
"Total WS connection attempts",
|
||||||
|
),
|
||||||
|
connectionsRejected: new Counter(
|
||||||
|
"broker_connections_rejected_total",
|
||||||
|
"WS connections refused (auth failure, capacity, etc.)",
|
||||||
|
),
|
||||||
|
connectionsActive: new Gauge(
|
||||||
|
"broker_connections_active",
|
||||||
|
"Currently connected peers",
|
||||||
|
),
|
||||||
|
messagesRoutedTotal: new Counter(
|
||||||
|
"broker_messages_routed_total",
|
||||||
|
"Messages successfully queued + routed",
|
||||||
|
),
|
||||||
|
messagesRejectedTotal: new Counter(
|
||||||
|
"broker_messages_rejected_total",
|
||||||
|
"Messages rejected (size, auth, malformed)",
|
||||||
|
),
|
||||||
|
queueDepth: new Gauge(
|
||||||
|
"broker_queue_depth",
|
||||||
|
"Undelivered messages currently in the queue",
|
||||||
|
),
|
||||||
|
ttlSweepsTotal: new Counter(
|
||||||
|
"broker_ttl_sweeps_total",
|
||||||
|
"TTL sweeper runs completed",
|
||||||
|
),
|
||||||
|
hookRequestsTotal: new Counter(
|
||||||
|
"broker_hook_requests_total",
|
||||||
|
"POST /hook/set-status requests received",
|
||||||
|
),
|
||||||
|
hookRequestsRateLimited: new Counter(
|
||||||
|
"broker_hook_requests_rate_limited_total",
|
||||||
|
"POST /hook/set-status rejected by rate limit",
|
||||||
|
),
|
||||||
|
dbHealthy: new Gauge(
|
||||||
|
"broker_db_healthy",
|
||||||
|
"1 if Postgres connection is up, 0 if not",
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
export function metricsToText(): string {
|
||||||
|
return (
|
||||||
|
Object.values(metrics)
|
||||||
|
.map((m) => m.toText())
|
||||||
|
.join("\n") + "\n"
|
||||||
|
);
|
||||||
|
}
|
||||||
61
apps/broker/src/rate-limit.ts
Normal file
61
apps/broker/src/rate-limit.ts
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
/**
|
||||||
|
* Token-bucket rate limiter keyed by an arbitrary string.
|
||||||
|
*
|
||||||
|
* Used to cap POST /hook/set-status at a sane per-session rate
|
||||||
|
* (hook scripts legitimately fire every turn; anything faster is
|
||||||
|
* either a loop or a compromised agent).
|
||||||
|
*
|
||||||
|
* In-process only. If we scale to multiple broker instances this
|
||||||
|
* moves to Redis, but for the single-instance broker it's enough.
|
||||||
|
*/
|
||||||
|
|
||||||
|
interface Bucket {
|
||||||
|
tokens: number;
|
||||||
|
lastRefill: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class TokenBucket {
|
||||||
|
private buckets = new Map<string, Bucket>();
|
||||||
|
private readonly refillPerMs: number;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private capacity: number,
|
||||||
|
refillPerMinute: number,
|
||||||
|
) {
|
||||||
|
this.refillPerMs = refillPerMinute / 60_000;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Take one token. Returns true if allowed, false if rate-limited. */
|
||||||
|
take(key: string, now = Date.now()): boolean {
|
||||||
|
const bucket = this.buckets.get(key) ?? {
|
||||||
|
tokens: this.capacity,
|
||||||
|
lastRefill: now,
|
||||||
|
};
|
||||||
|
const elapsed = now - bucket.lastRefill;
|
||||||
|
if (elapsed > 0) {
|
||||||
|
bucket.tokens = Math.min(
|
||||||
|
this.capacity,
|
||||||
|
bucket.tokens + elapsed * this.refillPerMs,
|
||||||
|
);
|
||||||
|
bucket.lastRefill = now;
|
||||||
|
}
|
||||||
|
if (bucket.tokens < 1) {
|
||||||
|
this.buckets.set(key, bucket);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
bucket.tokens -= 1;
|
||||||
|
this.buckets.set(key, bucket);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Periodic GC: drop buckets whose keys haven't been touched in a while. */
|
||||||
|
sweep(olderThanMs = 10 * 60 * 1000, now = Date.now()): void {
|
||||||
|
for (const [key, bucket] of this.buckets) {
|
||||||
|
if (now - bucket.lastRefill > olderThanMs) this.buckets.delete(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
get size(): number {
|
||||||
|
return this.buckets.size;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user