Files
claudemesh/apps/broker/src/metrics.ts
Alejandro Gutiérrez 05729ad8a4
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
feat(ga): close remaining GA blockers (backcompat, HA prep, tests, docs)
Backwards compat shim (task 27)
- requireCliAuth() falls back to body.user_id when BROKER_LEGACY_AUTH=1
  and no bearer present. Sets Deprecation + Warning headers + bumps a
  broker_legacy_auth_hits_total metric so operators can watch the
  legacy traffic drain to 0 before removing the shim.
- All handlers parse body BEFORE requireCliAuth so the fallback can
  read user_id out of it.

HA readiness (task 29)
- .artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md
  documents every in-memory symbol and rollout plan (phase 0-4).
- packaging/docker-compose.ha-local.yml spins up 2 broker replicas
  behind Traefik sticky sessions for local smoke testing.
- apps/broker/src/audit.ts now wraps writes in a transaction that
  takes pg_advisory_xact_lock(meshId) and re-reads the tail hash
  inside the txn. Concurrent broker replicas can no longer fork the
  audit chain.

Deploy gate (task 30)
- /health stays permissive (200 even on transient DB blips) so
  Docker doesn't kill the container on a glitch.
- New /health/ready checks DB + optional EXPECTED_MIGRATION pin,
  returns 503 if either fails. External deploy gate can poll this
  and refuse to promote a broken deploy.

Metrics dashboard (task 32)
- packaging/grafana/claudemesh-broker.json: ready-to-import Grafana
  dashboard covering active conns, queue depth, routed/rejected
  rates, grant drops, legacy-auth hits, conn rejects.

Tests (task 28)
- audit-canonical.test.ts (4 tests) pins canonical JSON semantics.
- grants-enforcement.test.ts (6 tests) covers the member-then-
  session-pubkey lookup with default/explicit/blocked branches.

Docs (task 34)
- docs/env-vars.md catalogues every env var the broker + CLI read.

Crypto review prep (task 35)
- .artifacts/specs/2026-04-15-crypto-review-packet.md: reviewer
  brief, threat model, scope, test coverage list, deliverables.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 23:51:28 +01:00

130 lines
3.6 KiB
TypeScript

/**
* Minimal in-process metrics, exposed as Prometheus plaintext.
*
* Intentionally no external deps — we track a handful of counters
* and gauges that matter for broker ops. Scraped by /metrics.
*/
type Labels = Record<string, string | number>;
class Counter {
private values = new Map<string, number>();
constructor(
public name: string,
public help: string,
) {}
inc(labels: Labels = {}, by = 1): void {
const key = labelKey(labels);
this.values.set(key, (this.values.get(key) ?? 0) + by);
}
toText(): string {
const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} counter`];
if (this.values.size === 0) {
lines.push(`${this.name} 0`);
} else {
for (const [key, v] of this.values) {
lines.push(`${this.name}${key} ${v}`);
}
}
return lines.join("\n");
}
}
class Gauge {
private values = new Map<string, number>();
constructor(
public name: string,
public help: string,
) {}
set(value: number, labels: Labels = {}): void {
this.values.set(labelKey(labels), value);
}
inc(labels: Labels = {}, by = 1): void {
const key = labelKey(labels);
this.values.set(key, (this.values.get(key) ?? 0) + by);
}
dec(labels: Labels = {}, by = 1): void {
this.inc(labels, -by);
}
toText(): string {
const lines = [`# HELP ${this.name} ${this.help}`, `# TYPE ${this.name} gauge`];
if (this.values.size === 0) {
lines.push(`${this.name} 0`);
} else {
for (const [key, v] of this.values) {
lines.push(`${this.name}${key} ${v}`);
}
}
return lines.join("\n");
}
}
function labelKey(labels: Labels): string {
const entries = Object.entries(labels);
if (entries.length === 0) return "";
const parts = entries
.sort(([a], [b]) => a.localeCompare(b))
.map(([k, v]) => `${k}="${String(v).replace(/"/g, '\\"')}"`)
.join(",");
return `{${parts}}`;
}
export const metrics = {
connectionsTotal: new Counter(
"broker_connections_total",
"Total WS connection attempts",
),
connectionsRejected: new Counter(
"broker_connections_rejected_total",
"WS connections refused (auth failure, capacity, etc.)",
),
connectionsActive: new Gauge(
"broker_connections_active",
"Currently connected peers",
),
messagesRoutedTotal: new Counter(
"broker_messages_routed_total",
"Messages successfully queued + routed",
),
messagesRejectedTotal: new Counter(
"broker_messages_rejected_total",
"Messages rejected (size, auth, malformed)",
),
messagesDroppedByGrantTotal: new Counter(
"broker_messages_dropped_by_grant_total",
"Messages silently dropped because recipient didn't grant sender the required capability",
),
brokerLegacyAuthHitsTotal: new Counter(
"broker_legacy_auth_hits_total",
"Pre-alpha.36 clients authenticating via body.user_id fallback (remove shim when near zero)",
),
queueDepth: new Gauge(
"broker_queue_depth",
"Undelivered messages currently in the queue",
),
ttlSweepsTotal: new Counter(
"broker_ttl_sweeps_total",
"TTL sweeper runs completed",
),
hookRequestsTotal: new Counter(
"broker_hook_requests_total",
"POST /hook/set-status requests received",
),
hookRequestsRateLimited: new Counter(
"broker_hook_requests_rate_limited_total",
"POST /hook/set-status rejected by rate limit",
),
dbHealthy: new Gauge(
"broker_db_healthy",
"1 if Postgres connection is up, 0 if not",
),
};
export function metricsToText(): string {
return (
Object.values(metrics)
.map((m) => m.toText())
.join("\n") + "\n"
);
}