Backwards compat shim (task 27) - requireCliAuth() falls back to body.user_id when BROKER_LEGACY_AUTH=1 and no bearer present. Sets Deprecation + Warning headers + bumps a broker_legacy_auth_hits_total metric so operators can watch the legacy traffic drain to 0 before removing the shim. - All handlers parse body BEFORE requireCliAuth so the fallback can read user_id out of it. HA readiness (task 29) - .artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md documents every in-memory symbol and rollout plan (phase 0-4). - packaging/docker-compose.ha-local.yml spins up 2 broker replicas behind Traefik sticky sessions for local smoke testing. - apps/broker/src/audit.ts now wraps writes in a transaction that takes pg_advisory_xact_lock(meshId) and re-reads the tail hash inside the txn. Concurrent broker replicas can no longer fork the audit chain. Deploy gate (task 30) - /health stays permissive (200 even on transient DB blips) so Docker doesn't kill the container on a glitch. - New /health/ready checks DB + optional EXPECTED_MIGRATION pin, returns 503 if either fails. External deploy gate can poll this and refuse to promote a broken deploy. Metrics dashboard (task 32) - packaging/grafana/claudemesh-broker.json: ready-to-import Grafana dashboard covering active conns, queue depth, routed/rejected rates, grant drops, legacy-auth hits, conn rejects. Tests (task 28) - audit-canonical.test.ts (4 tests) pins canonical JSON semantics. - grants-enforcement.test.ts (6 tests) covers the member-then- session-pubkey lookup with default/explicit/blocked branches. Docs (task 34) - docs/env-vars.md catalogues every env var the broker + CLI read. Crypto review prep (task 35) - .artifacts/specs/2026-04-15-crypto-review-packet.md: reviewer brief, threat model, scope, test coverage list, deliverables. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
264 lines
8.5 KiB
TypeScript
264 lines
8.5 KiB
TypeScript
/**
|
|
* Signed audit log with hash-chain integrity.
|
|
*
|
|
* Every significant mesh event is recorded as an append-only entry.
|
|
* Each entry's SHA-256 hash includes the previous entry's hash,
|
|
* forming a tamper-evident chain per mesh. If any row is modified
|
|
* or deleted, all subsequent hashes will fail verification.
|
|
*
|
|
* NEVER logs message content (ciphertext or plaintext) — only metadata.
|
|
*/
|
|
|
|
import { createHash } from "node:crypto";
|
|
import { asc, desc, eq, sql, and } from "drizzle-orm";
|
|
import { db } from "./db";
|
|
import { auditLog } from "@turbostarter/db/schema/mesh";
|
|
import { log } from "./logger";
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// In-memory last-hash cache (one entry per mesh, loaded from DB on startup)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
const lastHash = new Map<string, string>();
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Core audit logging
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Deterministic JSON serialization: keys sorted recursively. The store
|
|
* is JSONB, which does NOT preserve key order, so hashing a naive
|
|
* JSON.stringify(row.payload) on verify can yield a different string
|
|
* from insert-time — false tamper reports. Canonical form guarantees
|
|
* both sides agree.
|
|
*/
|
|
function canonicalJson(value: unknown): string {
|
|
if (value === null || typeof value !== "object") return JSON.stringify(value);
|
|
if (Array.isArray(value)) {
|
|
return "[" + value.map(canonicalJson).join(",") + "]";
|
|
}
|
|
const obj = value as Record<string, unknown>;
|
|
const keys = Object.keys(obj).sort();
|
|
return (
|
|
"{" +
|
|
keys
|
|
.map((k) => JSON.stringify(k) + ":" + canonicalJson(obj[k]))
|
|
.join(",") +
|
|
"}"
|
|
);
|
|
}
|
|
|
|
function computeHash(
|
|
prevHash: string,
|
|
meshId: string,
|
|
eventType: string,
|
|
actorMemberId: string | null,
|
|
payload: Record<string, unknown>,
|
|
createdAt: Date,
|
|
): string {
|
|
const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${canonicalJson(payload)}|${createdAt.toISOString()}`;
|
|
return createHash("sha256").update(input).digest("hex");
|
|
}
|
|
|
|
/**
|
|
* Stable 63-bit lock key per mesh for audit serialization under HA.
|
|
* Use the audit lock space; keep distinct from migrate's 74737_73831.
|
|
*/
|
|
function meshLockKey(meshId: string): bigint {
|
|
const digest = createHash("sha256").update("audit:" + meshId).digest();
|
|
const unsigned = digest.readBigUInt64BE(0);
|
|
return unsigned & 0x7fffffffffffffffn;
|
|
}
|
|
|
|
/**
|
|
* Append an audit entry for a mesh event.
|
|
*
|
|
* Fire-and-forget safe — callers should `void audit(...)` or
|
|
* `.catch(log.warn)` to avoid blocking the hot path.
|
|
*
|
|
* Concurrency under HA: wraps the write in a transaction that takes
|
|
* `pg_advisory_xact_lock(meshLockKey(meshId))` before reading the
|
|
* tail hash from the DB. This serializes all concurrent writers to
|
|
* the same mesh and prevents the chain from forking. The in-memory
|
|
* `lastHash` cache is updated after a successful commit.
|
|
*/
|
|
export async function audit(
|
|
meshId: string,
|
|
eventType: string,
|
|
actorMemberId: string | null,
|
|
actorDisplayName: string | null,
|
|
payload: Record<string, unknown>,
|
|
): Promise<void> {
|
|
const createdAt = new Date();
|
|
try {
|
|
await db.transaction(async (tx) => {
|
|
const key = meshLockKey(meshId);
|
|
await tx.execute(sql`SELECT pg_advisory_xact_lock(${key}::bigint)`);
|
|
const [latest] = await tx
|
|
.select({ hash: auditLog.hash })
|
|
.from(auditLog)
|
|
.where(eq(auditLog.meshId, meshId))
|
|
.orderBy(desc(auditLog.id))
|
|
.limit(1);
|
|
const prevHash = latest?.hash ?? "genesis";
|
|
const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt);
|
|
await tx.insert(auditLog).values({
|
|
meshId,
|
|
eventType,
|
|
actorMemberId,
|
|
actorDisplayName,
|
|
payload,
|
|
prevHash,
|
|
hash,
|
|
createdAt,
|
|
});
|
|
lastHash.set(meshId, hash);
|
|
});
|
|
} catch (e) {
|
|
log.warn("audit log insert failed", {
|
|
mesh_id: meshId,
|
|
event_type: eventType,
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Startup: load last hash per mesh from DB
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export async function loadLastHashes(): Promise<void> {
|
|
try {
|
|
// For each mesh, find the most recent audit entry by id (serial).
|
|
// DISTINCT ON (mesh_id) ORDER BY id DESC gives us one row per mesh.
|
|
const rows = await db.execute<{ mesh_id: string; hash: string }>(sql`
|
|
SELECT DISTINCT ON (mesh_id) mesh_id, hash
|
|
FROM mesh.audit_log
|
|
ORDER BY mesh_id, id DESC
|
|
`);
|
|
|
|
for (const row of rows) {
|
|
lastHash.set(row.mesh_id, row.hash);
|
|
}
|
|
log.info("audit: loaded last hashes", { meshes: lastHash.size });
|
|
} catch (e) {
|
|
// Table may not exist yet on first boot — that's fine.
|
|
log.warn("audit: loadLastHashes failed (table may not exist yet)", {
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Chain verification
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export async function verifyChain(
|
|
meshId: string,
|
|
): Promise<{ valid: boolean; entries: number; brokenAt?: number }> {
|
|
const rows = await db
|
|
.select()
|
|
.from(auditLog)
|
|
.where(eq(auditLog.meshId, meshId))
|
|
.orderBy(asc(auditLog.id));
|
|
|
|
if (rows.length === 0) {
|
|
return { valid: true, entries: 0 };
|
|
}
|
|
|
|
for (let i = 0; i < rows.length; i++) {
|
|
const row = rows[i]!;
|
|
const expectedPrevHash = i === 0 ? "genesis" : rows[i - 1]!.hash;
|
|
|
|
// Verify prevHash linkage
|
|
if (row.prevHash !== expectedPrevHash) {
|
|
return { valid: false, entries: rows.length, brokenAt: row.id };
|
|
}
|
|
|
|
// Recompute hash and verify
|
|
const recomputed = computeHash(
|
|
row.prevHash,
|
|
row.meshId,
|
|
row.eventType,
|
|
row.actorMemberId,
|
|
row.payload as Record<string, unknown>,
|
|
row.createdAt,
|
|
);
|
|
if (recomputed !== row.hash) {
|
|
return { valid: false, entries: rows.length, brokenAt: row.id };
|
|
}
|
|
}
|
|
|
|
return { valid: true, entries: rows.length };
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Query: paginated audit entries
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export async function queryAuditLog(
|
|
meshId: string,
|
|
options?: { limit?: number; offset?: number; eventType?: string },
|
|
): Promise<{ entries: Array<{ id: number; eventType: string; actor: string; payload: Record<string, unknown>; hash: string; createdAt: string }>; total: number }> {
|
|
const limit = options?.limit ?? 50;
|
|
const offset = options?.offset ?? 0;
|
|
|
|
const conditions = [eq(auditLog.meshId, meshId)];
|
|
if (options?.eventType) {
|
|
conditions.push(eq(auditLog.eventType, options.eventType));
|
|
}
|
|
const where = conditions.length === 1 ? conditions[0]! : and(...conditions);
|
|
|
|
const [rows, countResult] = await Promise.all([
|
|
db
|
|
.select()
|
|
.from(auditLog)
|
|
.where(where)
|
|
.orderBy(desc(auditLog.id))
|
|
.limit(limit)
|
|
.offset(offset),
|
|
db
|
|
.select({ count: sql<number>`count(*)` })
|
|
.from(auditLog)
|
|
.where(where),
|
|
]);
|
|
|
|
return {
|
|
entries: rows.map((r) => ({
|
|
id: r.id,
|
|
eventType: r.eventType,
|
|
actor: r.actorDisplayName ?? r.actorMemberId ?? "system",
|
|
payload: r.payload as Record<string, unknown>,
|
|
hash: r.hash,
|
|
createdAt: r.createdAt.toISOString(),
|
|
})),
|
|
total: Number(countResult[0]?.count ?? 0),
|
|
};
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Ensure table exists (raw DDL for first-boot before migrations run)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export async function ensureAuditLogTable(): Promise<void> {
|
|
try {
|
|
await db.execute(sql`
|
|
CREATE TABLE IF NOT EXISTS mesh.audit_log (
|
|
id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE,
|
|
event_type TEXT NOT NULL,
|
|
actor_member_id TEXT,
|
|
actor_display_name TEXT,
|
|
payload JSONB NOT NULL DEFAULT '{}',
|
|
prev_hash TEXT NOT NULL,
|
|
hash TEXT NOT NULL,
|
|
created_at TIMESTAMP NOT NULL DEFAULT now()
|
|
)
|
|
`);
|
|
} catch (e) {
|
|
log.warn("audit: ensureAuditLogTable failed", {
|
|
error: e instanceof Error ? e.message : String(e),
|
|
});
|
|
}
|
|
}
|