feat(ga): close remaining GA blockers (backcompat, HA prep, tests, docs)
Backwards compat shim (task 27) - requireCliAuth() falls back to body.user_id when BROKER_LEGACY_AUTH=1 and no bearer present. Sets Deprecation + Warning headers + bumps a broker_legacy_auth_hits_total metric so operators can watch the legacy traffic drain to 0 before removing the shim. - All handlers parse body BEFORE requireCliAuth so the fallback can read user_id out of it. HA readiness (task 29) - .artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md documents every in-memory symbol and rollout plan (phase 0-4). - packaging/docker-compose.ha-local.yml spins up 2 broker replicas behind Traefik sticky sessions for local smoke testing. - apps/broker/src/audit.ts now wraps writes in a transaction that takes pg_advisory_xact_lock(meshId) and re-reads the tail hash inside the txn. Concurrent broker replicas can no longer fork the audit chain. Deploy gate (task 30) - /health stays permissive (200 even on transient DB blips) so Docker doesn't kill the container on a glitch. - New /health/ready checks DB + optional EXPECTED_MIGRATION pin, returns 503 if either fails. External deploy gate can poll this and refuse to promote a broken deploy. Metrics dashboard (task 32) - packaging/grafana/claudemesh-broker.json: ready-to-import Grafana dashboard covering active conns, queue depth, routed/rejected rates, grant drops, legacy-auth hits, conn rejects. Tests (task 28) - audit-canonical.test.ts (4 tests) pins canonical JSON semantics. - grants-enforcement.test.ts (6 tests) covers the member-then- session-pubkey lookup with default/explicit/blocked branches. Docs (task 34) - docs/env-vars.md catalogues every env var the broker + CLI read. Crypto review prep (task 35) - .artifacts/specs/2026-04-15-crypto-review-packet.md: reviewer brief, threat model, scope, test coverage list, deliverables. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -60,11 +60,27 @@ function computeHash(
|
||||
return createHash("sha256").update(input).digest("hex");
|
||||
}
|
||||
|
||||
/**
|
||||
* Stable 63-bit lock key per mesh for audit serialization under HA.
|
||||
* Use the audit lock space; keep distinct from migrate's 74737_73831.
|
||||
*/
|
||||
function meshLockKey(meshId: string): bigint {
|
||||
const digest = createHash("sha256").update("audit:" + meshId).digest();
|
||||
const unsigned = digest.readBigUInt64BE(0);
|
||||
return unsigned & 0x7fffffffffffffffn;
|
||||
}
|
||||
|
||||
/**
|
||||
* Append an audit entry for a mesh event.
|
||||
*
|
||||
* Fire-and-forget safe — callers should `void audit(...)` or
|
||||
* `.catch(log.warn)` to avoid blocking the hot path.
|
||||
*
|
||||
* Concurrency under HA: wraps the write in a transaction that takes
|
||||
* `pg_advisory_xact_lock(meshLockKey(meshId))` before reading the
|
||||
* tail hash from the DB. This serializes all concurrent writers to
|
||||
* the same mesh and prevents the chain from forking. The in-memory
|
||||
* `lastHash` cache is updated after a successful commit.
|
||||
*/
|
||||
export async function audit(
|
||||
meshId: string,
|
||||
@@ -73,22 +89,31 @@ export async function audit(
|
||||
actorDisplayName: string | null,
|
||||
payload: Record<string, unknown>,
|
||||
): Promise<void> {
|
||||
const prevHash = lastHash.get(meshId) ?? "genesis";
|
||||
const createdAt = new Date();
|
||||
const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt);
|
||||
|
||||
try {
|
||||
await db.insert(auditLog).values({
|
||||
meshId,
|
||||
eventType,
|
||||
actorMemberId,
|
||||
actorDisplayName,
|
||||
payload,
|
||||
prevHash,
|
||||
hash,
|
||||
createdAt,
|
||||
await db.transaction(async (tx) => {
|
||||
const key = meshLockKey(meshId);
|
||||
await tx.execute(sql`SELECT pg_advisory_xact_lock(${key}::bigint)`);
|
||||
const [latest] = await tx
|
||||
.select({ hash: auditLog.hash })
|
||||
.from(auditLog)
|
||||
.where(eq(auditLog.meshId, meshId))
|
||||
.orderBy(desc(auditLog.id))
|
||||
.limit(1);
|
||||
const prevHash = latest?.hash ?? "genesis";
|
||||
const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt);
|
||||
await tx.insert(auditLog).values({
|
||||
meshId,
|
||||
eventType,
|
||||
actorMemberId,
|
||||
actorDisplayName,
|
||||
payload,
|
||||
prevHash,
|
||||
hash,
|
||||
createdAt,
|
||||
});
|
||||
lastHash.set(meshId, hash);
|
||||
});
|
||||
lastHash.set(meshId, hash);
|
||||
} catch (e) {
|
||||
log.warn("audit log insert failed", {
|
||||
mesh_id: meshId,
|
||||
|
||||
@@ -578,14 +578,58 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
|
||||
const route = `${req.method} ${req.url}`;
|
||||
|
||||
if (req.method === "GET" && req.url === "/health") {
|
||||
const healthy = isDbHealthy();
|
||||
const status = healthy ? 200 : 503;
|
||||
writeJson(res, status, {
|
||||
status: healthy ? "ok" : "degraded",
|
||||
db: healthy ? "up" : "down",
|
||||
// Liveness: is the process responding? Coolify uses this to decide
|
||||
// if the container is alive. Stays 200 even on DB glitches so a
|
||||
// transient DB blip doesn't kill the container.
|
||||
writeJson(res, 200, {
|
||||
status: "ok",
|
||||
db: isDbHealthy() ? "up" : "down",
|
||||
...buildInfo(),
|
||||
});
|
||||
log.debug("http", { route, status, latency_ms: Date.now() - started });
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "GET" && req.url === "/health/ready") {
|
||||
// Readiness: should we accept traffic? Used as the deploy gate —
|
||||
// if this fails, the new container isn't promoted and the old one
|
||||
// keeps serving. Checks: DB is healthy, migrations table has the
|
||||
// expected newest migration, no pending fatal errors at boot.
|
||||
(async () => {
|
||||
try {
|
||||
const dbOk = isDbHealthy();
|
||||
if (!dbOk) {
|
||||
writeJson(res, 503, { status: "not_ready", reason: "db_down" });
|
||||
return;
|
||||
}
|
||||
// Verify the newest local migration is present in the drizzle
|
||||
// tracking table. If the deploy shipped new migrations that
|
||||
// didn't apply, this fails closed → Coolify rejects the deploy.
|
||||
const expectedMigration = process.env.EXPECTED_MIGRATION ?? null;
|
||||
if (expectedMigration) {
|
||||
const rows = await db.execute<{ hash: string }>(sql`
|
||||
SELECT hash FROM drizzle.__drizzle_migrations
|
||||
WHERE hash = ${expectedMigration}
|
||||
LIMIT 1
|
||||
`);
|
||||
const arr = Array.isArray(rows) ? rows : (rows as { rows?: unknown[] }).rows ?? [];
|
||||
if (arr.length === 0) {
|
||||
writeJson(res, 503, {
|
||||
status: "not_ready",
|
||||
reason: "migration_missing",
|
||||
expected: expectedMigration,
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
writeJson(res, 200, { status: "ready", ...buildInfo() });
|
||||
} catch (e) {
|
||||
writeJson(res, 503, {
|
||||
status: "not_ready",
|
||||
reason: "readiness_check_error",
|
||||
error: e instanceof Error ? e.message : String(e),
|
||||
});
|
||||
}
|
||||
})();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -4760,43 +4804,80 @@ async function hashToken(token: string): Promise<string> {
|
||||
*
|
||||
* Returns null (and writes 401) on missing/invalid/revoked tokens.
|
||||
* Callers must `return` immediately after a null response.
|
||||
*
|
||||
* Backwards compatibility (30-day window):
|
||||
* Pre-alpha.36 CLIs sent `user_id` in the JSON body and no bearer
|
||||
* token. To avoid breaking them overnight we accept a legacy fallback
|
||||
* when BROKER_LEGACY_AUTH=1 is set in the environment: if no bearer is
|
||||
* present, read the body's `user_id` and treat it as authenticated
|
||||
* (same lax model the broker had before). A Deprecation header is
|
||||
* attached and the event is logged so operators can count usage.
|
||||
* Remove the shim after 2026-05-15 or when `broker_legacy_auth_hits`
|
||||
* metric is near zero.
|
||||
*
|
||||
* Security note: the legacy path is OFF by default. Enable only as a
|
||||
* deliberate rollout choice.
|
||||
*/
|
||||
async function requireCliAuth(
|
||||
req: IncomingMessage,
|
||||
res: ServerResponse,
|
||||
): Promise<{ userId: string; sessionId: string } | null> {
|
||||
legacyBody?: { user_id?: unknown } | null,
|
||||
): Promise<{ userId: string; sessionId: string | null } | null> {
|
||||
const header = req.headers["authorization"];
|
||||
if (!header || typeof header !== "string" || !header.startsWith("Bearer ")) {
|
||||
writeJson(res, 401, { error: "missing_bearer_token" });
|
||||
return null;
|
||||
}
|
||||
const token = header.slice("Bearer ".length).trim();
|
||||
if (!token) {
|
||||
writeJson(res, 401, { error: "empty_bearer_token" });
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const hash = await hashToken(token);
|
||||
const [session] = await db
|
||||
.select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt })
|
||||
.from(cliSessionTable)
|
||||
.where(eq(cliSessionTable.tokenHash, hash))
|
||||
.limit(1);
|
||||
if (!session || session.revokedAt) {
|
||||
writeJson(res, 401, { error: "invalid_or_revoked_token" });
|
||||
if (header && typeof header === "string" && header.startsWith("Bearer ")) {
|
||||
const token = header.slice("Bearer ".length).trim();
|
||||
if (!token) {
|
||||
writeJson(res, 401, { error: "empty_bearer_token" });
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const hash = await hashToken(token);
|
||||
const [session] = await db
|
||||
.select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt })
|
||||
.from(cliSessionTable)
|
||||
.where(eq(cliSessionTable.tokenHash, hash))
|
||||
.limit(1);
|
||||
if (!session || session.revokedAt) {
|
||||
writeJson(res, 401, { error: "invalid_or_revoked_token" });
|
||||
return null;
|
||||
}
|
||||
db.update(cliSessionTable)
|
||||
.set({ lastSeenAt: new Date() })
|
||||
.where(eq(cliSessionTable.id, session.id))
|
||||
.catch(() => { /* non-fatal */ });
|
||||
return { userId: session.userId, sessionId: session.id };
|
||||
} catch (e) {
|
||||
log.error("auth", { err: e instanceof Error ? e.message : String(e) });
|
||||
writeJson(res, 500, { error: "auth_check_failed" });
|
||||
return null;
|
||||
}
|
||||
// Touch last-seen so operators can see stale sessions.
|
||||
db.update(cliSessionTable)
|
||||
.set({ lastSeenAt: new Date() })
|
||||
.where(eq(cliSessionTable.id, session.id))
|
||||
.catch(() => { /* non-fatal */ });
|
||||
return { userId: session.userId, sessionId: session.id };
|
||||
} catch (e) {
|
||||
log.error("auth", { err: e instanceof Error ? e.message : String(e) });
|
||||
writeJson(res, 500, { error: "auth_check_failed" });
|
||||
return null;
|
||||
}
|
||||
|
||||
// Legacy fallback (off by default). Only triggers when no bearer was
|
||||
// supplied AND the operator explicitly opted in.
|
||||
if (process.env.BROKER_LEGACY_AUTH === "1") {
|
||||
const legacyUserId =
|
||||
legacyBody && typeof legacyBody.user_id === "string" ? legacyBody.user_id : null;
|
||||
if (legacyUserId) {
|
||||
res.setHeader(
|
||||
"Deprecation",
|
||||
'version="legacy-body-userid"; sunset="2026-05-15"',
|
||||
);
|
||||
res.setHeader(
|
||||
"Warning",
|
||||
'299 - "body.user_id auth is deprecated; send Authorization: Bearer <session_token>"',
|
||||
);
|
||||
metrics.brokerLegacyAuthHitsTotal?.inc?.();
|
||||
log.warn("legacy auth accepted", {
|
||||
route: req.url,
|
||||
user_id: legacyUserId,
|
||||
});
|
||||
return { userId: legacyUserId, sessionId: null };
|
||||
}
|
||||
}
|
||||
|
||||
writeJson(res, 401, { error: "missing_bearer_token" });
|
||||
return null;
|
||||
}
|
||||
|
||||
/** POST /cli/device-code — create a new device code. */
|
||||
@@ -4972,7 +5053,11 @@ async function handleDeviceCodeApprove(req: IncomingMessage, code: string, res:
|
||||
/** GET /cli/sessions?user_id=... — list CLI sessions for a user. */
|
||||
/** GET /cli/meshes?user_id=... — list all meshes for a user with member counts. */
|
||||
async function handleCliMeshesList(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> {
|
||||
const auth = await requireCliAuth(req, res);
|
||||
// Legacy fallback for pre-alpha.36 clients that put user_id in query.
|
||||
// requireCliAuth reads it off a body; we synthesize one for GET.
|
||||
const url = new URL(req.url!, "http://localhost");
|
||||
const legacyBody = { user_id: url.searchParams.get("user_id") ?? undefined };
|
||||
const auth = await requireCliAuth(req, res, legacyBody);
|
||||
if (!auth) return;
|
||||
const userId = auth.userId;
|
||||
|
||||
@@ -5177,10 +5262,7 @@ import { meshPermission } from "@turbostarter/db/schema/mesh";
|
||||
* for a specific peer = blocked. Explicit null = reset to defaults.
|
||||
*/
|
||||
async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
|
||||
const auth = await requireCliAuth(req, res);
|
||||
if (!auth) return;
|
||||
|
||||
let body: { grants: Record<string, string[] | null> };
|
||||
let body: { grants: Record<string, string[] | null>; user_id?: string };
|
||||
try {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) chunks.push(chunk as Buffer);
|
||||
@@ -5189,6 +5271,9 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv
|
||||
writeJson(res, 400, { error: "Invalid body" });
|
||||
return;
|
||||
}
|
||||
|
||||
const auth = await requireCliAuth(req, res, body);
|
||||
if (!auth) return;
|
||||
if (!body.grants) {
|
||||
writeJson(res, 400, { error: "grants required" });
|
||||
return;
|
||||
@@ -5221,10 +5306,7 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv
|
||||
|
||||
/** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */
|
||||
async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
|
||||
const auth = await requireCliAuth(req, res);
|
||||
if (!auth) return;
|
||||
|
||||
let body: { email?: string; expires_in?: string; role?: string };
|
||||
let body: { email?: string; expires_in?: string; role?: string; user_id?: string };
|
||||
try {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) chunks.push(chunk as Buffer);
|
||||
@@ -5234,6 +5316,9 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv
|
||||
return;
|
||||
}
|
||||
|
||||
const auth = await requireCliAuth(req, res, body);
|
||||
if (!auth) return;
|
||||
|
||||
try {
|
||||
const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1);
|
||||
if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; }
|
||||
@@ -5359,10 +5444,8 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv
|
||||
}
|
||||
|
||||
async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> {
|
||||
const auth = await requireCliAuth(req, res);
|
||||
if (!auth) return;
|
||||
|
||||
let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string };
|
||||
// Parse body first so the legacy auth fallback can read user_id from it.
|
||||
let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string; user_id?: string };
|
||||
try {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) chunks.push(chunk as Buffer);
|
||||
@@ -5372,6 +5455,9 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
|
||||
return;
|
||||
}
|
||||
|
||||
const auth = await requireCliAuth(req, res, body);
|
||||
if (!auth) return;
|
||||
|
||||
if (!body.name) {
|
||||
writeJson(res, 400, { error: "name required" });
|
||||
return;
|
||||
@@ -5436,7 +5522,15 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
|
||||
|
||||
/** DELETE /cli/mesh/:slug — delete a mesh (owner only). */
|
||||
async function handleMeshDelete(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
|
||||
const auth = await requireCliAuth(req, res);
|
||||
// Parse body up front for legacy auth fallback.
|
||||
let body: { user_id?: string } = {};
|
||||
try {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) chunks.push(chunk as Buffer);
|
||||
const raw = Buffer.concat(chunks).toString();
|
||||
if (raw) body = JSON.parse(raw) as typeof body;
|
||||
} catch { /* empty body is OK for DELETE with bearer auth */ }
|
||||
const auth = await requireCliAuth(req, res, body);
|
||||
if (!auth) return;
|
||||
|
||||
try {
|
||||
|
||||
@@ -94,6 +94,10 @@ export const metrics = {
|
||||
"broker_messages_dropped_by_grant_total",
|
||||
"Messages silently dropped because recipient didn't grant sender the required capability",
|
||||
),
|
||||
brokerLegacyAuthHitsTotal: new Counter(
|
||||
"broker_legacy_auth_hits_total",
|
||||
"Pre-alpha.36 clients authenticating via body.user_id fallback (remove shim when near zero)",
|
||||
),
|
||||
queueDepth: new Gauge(
|
||||
"broker_queue_depth",
|
||||
"Undelivered messages currently in the queue",
|
||||
|
||||
Reference in New Issue
Block a user