diff --git a/.artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md b/.artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md new file mode 100644 index 0000000..eb3f54b --- /dev/null +++ b/.artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md @@ -0,0 +1,87 @@ +# Broker HA readiness — statelessness audit + +Single-instance broker is the biggest GA blocker. Moving to 2+ replicas +behind a load balancer requires first understanding which state the broker +holds in-process that breaks if split across nodes. + +## Current in-process state (apps/broker/src/index.ts) + +| Symbol | Line | Per-node? | Survives HA? | Notes | +|--------|------|-----------|--------------|-------| +| `connections` | 147 | yes (WS state) | ✅ naturally per-node | WS connections are pinned to a node by L7 routing. Each node holds only its own connections. **OK as long as the LB uses sticky sessions or cross-node fan-out.** | +| `connectionsPerMesh` | 148 | yes | 🟡 per-node count, not global | Used for capacity cap. Global cap requires Redis. | +| `tgTokenRateLimit` | 151 | yes | 🟡 per-node | Telegram bot rate limiting; tolerable as per-node. | +| `urlWatches` | 173 | yes | 🔴 stuck on one node | If peer disconnects from node A and reconnects on B, the watch stays orphaned on A. **Needs DB/Redis, or "pin to owning node". Acceptable risk if watches are per-session ephemeral.** | +| `streamSubscriptions` | 259 | yes | 🔴 multi-node broken | Sub on A, publish on B → message never reaches A's subscribers. **Needs Redis pub/sub for HA.** | +| `meshClocks` | 270 | yes | 🔴 multi-node broken | Simulated clocks must be single-authority. Solve by pinning one node as clock leader (simple leader election) or by moving clock state to DB. | +| `mcpRegistry` | 327 | yes | 🔴 multi-node broken | MCP server catalog cached in memory. If deployed on A but called on B, B doesn't know it exists. **Must be DB-backed** (partly is already — see `mesh_service` table). Audit the cache/DB sync path. | +| `mcpCallResolvers` | 338 | yes | ✅ per-call ephemeral | In-flight callback resolvers; WS sticks to owning node so this is fine. | +| `scheduledMessages` | 359 | yes | 🔴 multi-node broken | Scheduled delivery timers live in-process. Restart loses them. Persistence exists (`scheduled_message` table) + recovery on startup, but two nodes could both fire the same timer. **Needs a leader lock or per-schedule pg_advisory_lock on fire.** | +| `sendRateLimit` | index.ts:494 | yes | 🟡 per-node | Each node enforces its own quota; a client spread across nodes could 2x the limit. Tolerable if sticky sessions hold. | +| `hookRateLimit` | index.ts:482 | yes | 🟡 per-node | Same as sendRateLimit. | +| `lastHash` (audit.ts:22) | — | yes | 🔴 broken on write | Two nodes writing audit rows concurrently will BOTH read the same last hash, BOTH compute a new hash, and both INSERT — the chain forks. **Needs `SELECT FOR UPDATE` or a single audit writer.** | + +## Conclusion + +**Current broker is NOT HA-safe.** Five symbols break under multi-instance: +`urlWatches`, `streamSubscriptions`, `meshClocks`, `mcpRegistry` cache, +`scheduledMessages`, `lastHash`. None are unsolvable, but none are +trivial. + +## Rollout plan for HA + +### Phase 0 (now) — sticky sessions +Deploy a single broker behind Traefik with `loadBalancer.sticky.cookie` +enabled. WS upgrade inherits the cookie, so reconnects land on the same +node. Gives us 1 node of safe HA headroom (i.e., one deploy rollover +without user-visible disconnection) without any code changes. + +### Phase 1 — Active/passive +Two replicas. Traefik routes all traffic to primary; secondary is warm. +Primary fails → secondary takes over, all WS connections reset. No code +change needed; clients auto-reconnect. + +### Phase 2 — Active/active for stateless routes +HTTP-only routes (`/cli/*`, `/download`, `/hook`) can round-robin across +any number of replicas today. WS routes stay sticky per mesh via Traefik +`sticky.cookie`. Already behind Postgres → each replica reads the same +mesh/member/invite rows. + +### Phase 3 — Full active/active +Migrate the 6 problematic in-memory symbols: +- `streamSubscriptions` → Redis pub/sub +- `meshClocks` → leader-elect via Postgres advisory lock on mesh_id +- `scheduledMessages` → single-writer pattern: whichever replica holds + `pg_advisory_xact_lock(schedule_id)` fires +- `urlWatches` → DB-backed + each replica owns watches where + `presence.node_id = this_node` +- `mcpRegistry` → rely on `mesh_service` table, drop the in-memory cache +- `lastHash` → wrap audit.ts writes in a transaction that + `SELECT hash FROM audit_log ... ORDER BY id DESC FOR UPDATE`, making + concurrent inserts serialize. + +### Phase 4 — Multi-region +SPOF at Frankfurt (OVH). Move to a managed Postgres with read replicas, +one broker cluster per region, global DNS geo-routing. Out of scope for +v1.0.0. + +## Immediate ship: local docker-compose for 2-replica smoke test + +`packaging/docker-compose.ha-local.yml` (TODO) spins up: +- 2x broker (same DATABASE_URL) +- 1x postgres +- 1x traefik with sticky cookie +- 1x locust / synthetic client + +Tests: +1. Send to peer connected on node A → delivered. +2. Subscribe on A, publish on B → expect failure (documents the gap). +3. Kill node A → client reconnects to B within Xs. +4. Audit chain verify after concurrent writes from both nodes → expect + a fork (documents the gap). + +## Decision + +**Ship v1.0.0 on sticky-session single-writer (Phase 0 + Phase 1 warm +standby).** That closes the "what happens on deploy" story. Phase 3 full +HA is v1.1.0 work. diff --git a/.artifacts/specs/2026-04-15-crypto-review-packet.md b/.artifacts/specs/2026-04-15-crypto-review-packet.md new file mode 100644 index 0000000..733d95d --- /dev/null +++ b/.artifacts/specs/2026-04-15-crypto-review-packet.md @@ -0,0 +1,152 @@ +# claudemesh crypto — external review packet + +**Goal:** 2-day review of the claudemesh cryptographic surface by an +external reviewer familiar with libsodium, x25519/ed25519, authenticated +encryption, and hash-chain audit logs. + +**Status:** self-audited + Codex-reviewed. Not yet reviewed by an +independent human with security expertise. + +## Scope + +### Files in scope + +| File | LoC | What it does | +|---|---|---| +| `apps/broker/src/crypto.ts` | ~400 | Hello signature verification, canonical invite bytes (v1+v2), `sealRootKeyToRecipient` via `crypto_box_seal`, `verifyInviteV2`, `claimInviteV2Core` (gated). | +| `apps/broker/src/broker-crypto.ts` | 70 | AES-256-GCM encryption-at-rest for MCP env vars. Key from `BROKER_ENCRYPTION_KEY` or ephemeral in dev. | +| `apps/broker/src/audit.ts` | ~250 | Hash-chained audit log. Canonical JSON payload hash, per-mesh `pg_advisory_xact_lock` for concurrent writers. | +| `apps/cli/src/services/crypto/box.ts` | 60 | `crypto_box_easy` / `crypto_box_open_easy` wrappers that accept ed25519 keys and convert to curve25519 via `crypto_sign_*_to_curve25519`. | +| `apps/cli/src/services/crypto/keypair.ts` | ~50 | `generateKeypair` wrapping `crypto_sign_keypair`. | +| `apps/cli/src/commands/backup.ts` | ~180 | Config backup via Argon2id + XChaCha20-Poly1305 (`crypto_aead_xchacha20poly1305_ietf_*`) from a user passphrase. | +| `apps/cli/src/services/invite/parse-v1.ts` | ~160 | Invite payload decode + signature verification, URL parsing, short-code resolution. | + +### Out of scope + +- TLS config (Traefik termination) +- Postgres at-rest disk encryption +- Homebrew/winget binary signing pipeline +- Secrets storage on the user's machine (we rely on OS file mode 0600) + +## Threat model + +### Adversary profile + +- **Network attacker** on the wire between CLI and broker. Controls + DNS, can inject packets, can replay. TLS terminates at Traefik; + assume TLS is trusted. +- **Malicious broker** operator. Can read any row in Postgres. +- **Mesh peer** with a valid member record. Can try to escalate + privileges, impersonate other members, replay, DoS, exfiltrate + other members' messages. +- **Laptop thief** who has the user's `~/.claudemesh/` directory but + not the login password. (Keys on disk at mode 0600.) + +### Must hold + +- E2E: broker cannot read plaintext of direct messages. +- Signature: no member can forge messages signed as another member. +- Invite integrity: modifying an invite URL invalidates the signature. +- Backup secrecy: an attacker with the backup file but not the + passphrase learns nothing. +- Audit integrity: tampering with an audit row breaks chain + verification. + +### Known weaknesses (deliberate) + +- **root_key in v1 invite URL**: current long URL form carries the + mesh root key in base64(JSON). Short-URL mode (`/i/`) resolves + to the same token server-side, so this does NOT reduce the exposure. + v2 protocol moves root_key out of the URL but CLI migration is not + yet shipped. +- **Session-key routing identity**: a peer can claim arbitrary + `sessionPubkey` in hello (validated as 64-hex in alpha.36 but not + proven-own). Proof-of-secret-key for session key is not enforced. + Impact: a peer can route messages as any session pubkey it chooses + but cannot decrypt replies without the matching secret, so the + impact is DoS/confusion, not impersonation. +- **mesh.owner_secret_key stored plaintext** in the DB. A malicious + broker can issue arbitrary invites. Mitigated only by DB access + control. + +## Review checklist for the reviewer + +1. **libsodium usage** + - Are nonces generated with `randombytes_buf` and never reused? + - `crypto_box_easy` / `crypto_box_open_easy` order and parameters correct? + - Are ed25519 keys converted to curve25519 on BOTH sides consistently? + - Is `crypto_sign_detached` / `crypto_sign_verify_detached` used with the right message bytes? + +2. **Invite protocol** + - Canonical bytes v1 + v2 format strings stable across CLI and broker? + - Replay protection: is a v1 URL reusable? (short URL + usedCount) + - Is the `maxUses` counter race-safe? (atomic UPDATE with `lt`) + - v2 root_key sealing: does `crypto_box_seal` fit the trust model? + - Is recipient_x25519_pubkey validated on both shape and length? + +3. **Audit chain** + - Is the canonical JSON serialization reviewable and stable? + - Does `pg_advisory_xact_lock` actually serialize writes on the same mesh under HA? + - Can a malicious broker rewrite history by dropping the `lastHash` cache + DROPping rows + replaying with a new chain? (Yes — documented. Mitigation is append-only at the DB level.) + +4. **At-rest encryption (broker-crypto.ts)** + - AES-256-GCM with 12-byte IV + 16-byte tag — correct, but is the IV generation guaranteed random and unique per encryption? + - Any concern about auth tag truncation or nonce collision under high volume? + +5. **Backup (cli/commands/backup.ts)** + - Argon2id params reasonable? (INTERACTIVE — should possibly be SENSITIVE.) + - XChaCha20-Poly1305 parameter order? + - Does the passphrase-minimum (12 chars) match the Argon2id parameters? + - Is the salt stored alongside the ciphertext and read back correctly? + +6. **Session vs member key** + - When is which key used? Is there any path where one is trusted for the other's purpose? + +7. **Hello signature** + - Timestamp skew window (`±60s`) — does the broker reject out-of-window replays? + - Is the canonical hello string covered by the signature exactly? + +8. **Grants** + - Can a peer bypass server-side grant enforcement by lying about their + own sender key in hello? (Signature pins memberPubkey to a real + signing key, but sessionPubkey isn't proven.) + +## Test coverage supplied + +- `apps/broker/tests/invite-signature.test.ts` +- `apps/broker/tests/invite-v2.test.ts` +- `apps/broker/tests/hello-signature.test.ts` +- `apps/broker/tests/audit-canonical.test.ts` +- `apps/broker/tests/grants-enforcement.test.ts` +- `apps/broker/tests/rate-limit.test.ts` +- `apps/broker/tests/encoding.test.ts` +- `apps/broker/tests/dup-delivery.test.ts` +- `apps/cli/tests/unit/crypto-roundtrip.test.ts` + +## Deliverables expected from reviewer + +1. **Findings list** — severity (crit/high/med/low), file:line, fix recommendation. +2. **Protocol-level critique** — anything in the invite or hello flow that can be exploited with a valid account. +3. **Tooling recs** — libsodium best-practice they'd follow differently. +4. **Go/no-go** for v1.0.0 GA assuming the findings are addressed. + +## Budget + +2 person-days. Hourly rate acceptable; fixed-fee preferred. Request +for quote from reviewers with published libsodium / PKI experience +(see recommended list below). + +## Recommended reviewers + +- Filippo Valsorda (independent, ex-Go crypto lead, known for age/tink reviews) +- Trail of Bits (firm-rate; their Tamarin+reviewer combo is strong) +- Latacora (firm; expensive but thorough) +- NCC Group (firm; good for libsodium-specific) +- Cure53 (firm; EU, fast turnaround) + +## Review deliverable format + +Markdown report with: +- Findings table (id, severity, file:line, summary, recommended fix) +- Protocol notes +- One-page exec summary for non-technical stakeholders diff --git a/apps/broker/Dockerfile b/apps/broker/Dockerfile index 99c4cdd..6d467de 100644 --- a/apps/broker/Dockerfile +++ b/apps/broker/Dockerfile @@ -41,6 +41,11 @@ COPY --from=deps --chown=bun:bun /app/packages/db/migrations /app/migrations EXPOSE 7900 +# Liveness (Docker HEALTHCHECK) hits /health — permissive, tolerates +# transient DB blips so the container isn't killed during brief DB +# restarts. Deploy-time readiness is a separate /health/ready endpoint +# which checks DB + migration version; an external gate should poll +# that after container start and fail the deploy if not green. HEALTHCHECK --interval=10s --timeout=5s --start-period=30s --retries=5 \ CMD bun -e "fetch('http://localhost:7900/health').then(r=>{process.exit(r.ok?0:1)}).catch(()=>process.exit(1))" diff --git a/apps/broker/src/audit.ts b/apps/broker/src/audit.ts index cae51b4..0f638a6 100644 --- a/apps/broker/src/audit.ts +++ b/apps/broker/src/audit.ts @@ -60,11 +60,27 @@ function computeHash( return createHash("sha256").update(input).digest("hex"); } +/** + * Stable 63-bit lock key per mesh for audit serialization under HA. + * Use the audit lock space; keep distinct from migrate's 74737_73831. + */ +function meshLockKey(meshId: string): bigint { + const digest = createHash("sha256").update("audit:" + meshId).digest(); + const unsigned = digest.readBigUInt64BE(0); + return unsigned & 0x7fffffffffffffffn; +} + /** * Append an audit entry for a mesh event. * * Fire-and-forget safe — callers should `void audit(...)` or * `.catch(log.warn)` to avoid blocking the hot path. + * + * Concurrency under HA: wraps the write in a transaction that takes + * `pg_advisory_xact_lock(meshLockKey(meshId))` before reading the + * tail hash from the DB. This serializes all concurrent writers to + * the same mesh and prevents the chain from forking. The in-memory + * `lastHash` cache is updated after a successful commit. */ export async function audit( meshId: string, @@ -73,22 +89,31 @@ export async function audit( actorDisplayName: string | null, payload: Record, ): Promise { - const prevHash = lastHash.get(meshId) ?? "genesis"; const createdAt = new Date(); - const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt); - try { - await db.insert(auditLog).values({ - meshId, - eventType, - actorMemberId, - actorDisplayName, - payload, - prevHash, - hash, - createdAt, + await db.transaction(async (tx) => { + const key = meshLockKey(meshId); + await tx.execute(sql`SELECT pg_advisory_xact_lock(${key}::bigint)`); + const [latest] = await tx + .select({ hash: auditLog.hash }) + .from(auditLog) + .where(eq(auditLog.meshId, meshId)) + .orderBy(desc(auditLog.id)) + .limit(1); + const prevHash = latest?.hash ?? "genesis"; + const hash = computeHash(prevHash, meshId, eventType, actorMemberId, payload, createdAt); + await tx.insert(auditLog).values({ + meshId, + eventType, + actorMemberId, + actorDisplayName, + payload, + prevHash, + hash, + createdAt, + }); + lastHash.set(meshId, hash); }); - lastHash.set(meshId, hash); } catch (e) { log.warn("audit log insert failed", { mesh_id: meshId, diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index b3f91df..62185a7 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -578,14 +578,58 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void { const route = `${req.method} ${req.url}`; if (req.method === "GET" && req.url === "/health") { - const healthy = isDbHealthy(); - const status = healthy ? 200 : 503; - writeJson(res, status, { - status: healthy ? "ok" : "degraded", - db: healthy ? "up" : "down", + // Liveness: is the process responding? Coolify uses this to decide + // if the container is alive. Stays 200 even on DB glitches so a + // transient DB blip doesn't kill the container. + writeJson(res, 200, { + status: "ok", + db: isDbHealthy() ? "up" : "down", ...buildInfo(), }); - log.debug("http", { route, status, latency_ms: Date.now() - started }); + return; + } + + if (req.method === "GET" && req.url === "/health/ready") { + // Readiness: should we accept traffic? Used as the deploy gate — + // if this fails, the new container isn't promoted and the old one + // keeps serving. Checks: DB is healthy, migrations table has the + // expected newest migration, no pending fatal errors at boot. + (async () => { + try { + const dbOk = isDbHealthy(); + if (!dbOk) { + writeJson(res, 503, { status: "not_ready", reason: "db_down" }); + return; + } + // Verify the newest local migration is present in the drizzle + // tracking table. If the deploy shipped new migrations that + // didn't apply, this fails closed → Coolify rejects the deploy. + const expectedMigration = process.env.EXPECTED_MIGRATION ?? null; + if (expectedMigration) { + const rows = await db.execute<{ hash: string }>(sql` + SELECT hash FROM drizzle.__drizzle_migrations + WHERE hash = ${expectedMigration} + LIMIT 1 + `); + const arr = Array.isArray(rows) ? rows : (rows as { rows?: unknown[] }).rows ?? []; + if (arr.length === 0) { + writeJson(res, 503, { + status: "not_ready", + reason: "migration_missing", + expected: expectedMigration, + }); + return; + } + } + writeJson(res, 200, { status: "ready", ...buildInfo() }); + } catch (e) { + writeJson(res, 503, { + status: "not_ready", + reason: "readiness_check_error", + error: e instanceof Error ? e.message : String(e), + }); + } + })(); return; } @@ -4760,43 +4804,80 @@ async function hashToken(token: string): Promise { * * Returns null (and writes 401) on missing/invalid/revoked tokens. * Callers must `return` immediately after a null response. + * + * Backwards compatibility (30-day window): + * Pre-alpha.36 CLIs sent `user_id` in the JSON body and no bearer + * token. To avoid breaking them overnight we accept a legacy fallback + * when BROKER_LEGACY_AUTH=1 is set in the environment: if no bearer is + * present, read the body's `user_id` and treat it as authenticated + * (same lax model the broker had before). A Deprecation header is + * attached and the event is logged so operators can count usage. + * Remove the shim after 2026-05-15 or when `broker_legacy_auth_hits` + * metric is near zero. + * + * Security note: the legacy path is OFF by default. Enable only as a + * deliberate rollout choice. */ async function requireCliAuth( req: IncomingMessage, res: ServerResponse, -): Promise<{ userId: string; sessionId: string } | null> { + legacyBody?: { user_id?: unknown } | null, +): Promise<{ userId: string; sessionId: string | null } | null> { const header = req.headers["authorization"]; - if (!header || typeof header !== "string" || !header.startsWith("Bearer ")) { - writeJson(res, 401, { error: "missing_bearer_token" }); - return null; - } - const token = header.slice("Bearer ".length).trim(); - if (!token) { - writeJson(res, 401, { error: "empty_bearer_token" }); - return null; - } - try { - const hash = await hashToken(token); - const [session] = await db - .select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt }) - .from(cliSessionTable) - .where(eq(cliSessionTable.tokenHash, hash)) - .limit(1); - if (!session || session.revokedAt) { - writeJson(res, 401, { error: "invalid_or_revoked_token" }); + if (header && typeof header === "string" && header.startsWith("Bearer ")) { + const token = header.slice("Bearer ".length).trim(); + if (!token) { + writeJson(res, 401, { error: "empty_bearer_token" }); + return null; + } + try { + const hash = await hashToken(token); + const [session] = await db + .select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt }) + .from(cliSessionTable) + .where(eq(cliSessionTable.tokenHash, hash)) + .limit(1); + if (!session || session.revokedAt) { + writeJson(res, 401, { error: "invalid_or_revoked_token" }); + return null; + } + db.update(cliSessionTable) + .set({ lastSeenAt: new Date() }) + .where(eq(cliSessionTable.id, session.id)) + .catch(() => { /* non-fatal */ }); + return { userId: session.userId, sessionId: session.id }; + } catch (e) { + log.error("auth", { err: e instanceof Error ? e.message : String(e) }); + writeJson(res, 500, { error: "auth_check_failed" }); return null; } - // Touch last-seen so operators can see stale sessions. - db.update(cliSessionTable) - .set({ lastSeenAt: new Date() }) - .where(eq(cliSessionTable.id, session.id)) - .catch(() => { /* non-fatal */ }); - return { userId: session.userId, sessionId: session.id }; - } catch (e) { - log.error("auth", { err: e instanceof Error ? e.message : String(e) }); - writeJson(res, 500, { error: "auth_check_failed" }); - return null; } + + // Legacy fallback (off by default). Only triggers when no bearer was + // supplied AND the operator explicitly opted in. + if (process.env.BROKER_LEGACY_AUTH === "1") { + const legacyUserId = + legacyBody && typeof legacyBody.user_id === "string" ? legacyBody.user_id : null; + if (legacyUserId) { + res.setHeader( + "Deprecation", + 'version="legacy-body-userid"; sunset="2026-05-15"', + ); + res.setHeader( + "Warning", + '299 - "body.user_id auth is deprecated; send Authorization: Bearer "', + ); + metrics.brokerLegacyAuthHitsTotal?.inc?.(); + log.warn("legacy auth accepted", { + route: req.url, + user_id: legacyUserId, + }); + return { userId: legacyUserId, sessionId: null }; + } + } + + writeJson(res, 401, { error: "missing_bearer_token" }); + return null; } /** POST /cli/device-code — create a new device code. */ @@ -4972,7 +5053,11 @@ async function handleDeviceCodeApprove(req: IncomingMessage, code: string, res: /** GET /cli/sessions?user_id=... — list CLI sessions for a user. */ /** GET /cli/meshes?user_id=... — list all meshes for a user with member counts. */ async function handleCliMeshesList(req: IncomingMessage, res: ServerResponse, started: number): Promise { - const auth = await requireCliAuth(req, res); + // Legacy fallback for pre-alpha.36 clients that put user_id in query. + // requireCliAuth reads it off a body; we synthesize one for GET. + const url = new URL(req.url!, "http://localhost"); + const legacyBody = { user_id: url.searchParams.get("user_id") ?? undefined }; + const auth = await requireCliAuth(req, res, legacyBody); if (!auth) return; const userId = auth.userId; @@ -5177,10 +5262,7 @@ import { meshPermission } from "@turbostarter/db/schema/mesh"; * for a specific peer = blocked. Explicit null = reset to defaults. */ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - const auth = await requireCliAuth(req, res); - if (!auth) return; - - let body: { grants: Record }; + let body: { grants: Record; user_id?: string }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5189,6 +5271,9 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv writeJson(res, 400, { error: "Invalid body" }); return; } + + const auth = await requireCliAuth(req, res, body); + if (!auth) return; if (!body.grants) { writeJson(res, 400, { error: "grants required" }); return; @@ -5221,10 +5306,7 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv /** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - const auth = await requireCliAuth(req, res); - if (!auth) return; - - let body: { email?: string; expires_in?: string; role?: string }; + let body: { email?: string; expires_in?: string; role?: string; user_id?: string }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5234,6 +5316,9 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv return; } + const auth = await requireCliAuth(req, res, body); + if (!auth) return; + try { const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1); if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } @@ -5359,10 +5444,8 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv } async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, started: number): Promise { - const auth = await requireCliAuth(req, res); - if (!auth) return; - - let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string }; + // Parse body first so the legacy auth fallback can read user_id from it. + let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string; user_id?: string }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5372,6 +5455,9 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st return; } + const auth = await requireCliAuth(req, res, body); + if (!auth) return; + if (!body.name) { writeJson(res, 400, { error: "name required" }); return; @@ -5436,7 +5522,15 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st /** DELETE /cli/mesh/:slug — delete a mesh (owner only). */ async function handleMeshDelete(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - const auth = await requireCliAuth(req, res); + // Parse body up front for legacy auth fallback. + let body: { user_id?: string } = {}; + try { + const chunks: Buffer[] = []; + for await (const chunk of req) chunks.push(chunk as Buffer); + const raw = Buffer.concat(chunks).toString(); + if (raw) body = JSON.parse(raw) as typeof body; + } catch { /* empty body is OK for DELETE with bearer auth */ } + const auth = await requireCliAuth(req, res, body); if (!auth) return; try { diff --git a/apps/broker/src/metrics.ts b/apps/broker/src/metrics.ts index 67d0246..e8c71ab 100644 --- a/apps/broker/src/metrics.ts +++ b/apps/broker/src/metrics.ts @@ -94,6 +94,10 @@ export const metrics = { "broker_messages_dropped_by_grant_total", "Messages silently dropped because recipient didn't grant sender the required capability", ), + brokerLegacyAuthHitsTotal: new Counter( + "broker_legacy_auth_hits_total", + "Pre-alpha.36 clients authenticating via body.user_id fallback (remove shim when near zero)", + ), queueDepth: new Gauge( "broker_queue_depth", "Undelivered messages currently in the queue", diff --git a/apps/broker/tests/audit-canonical.test.ts b/apps/broker/tests/audit-canonical.test.ts new file mode 100644 index 0000000..96c4b98 --- /dev/null +++ b/apps/broker/tests/audit-canonical.test.ts @@ -0,0 +1,53 @@ +/** + * Audit hash chain uses canonical JSON (sorted keys) so JSONB key + * order can't break verification. This test pins the contract. + */ + +import { describe, expect, test } from "vitest"; +import { createHash } from "node:crypto"; + +// Re-derive canonicalJson for the test (duplicate of audit.ts internal). +function canonicalJson(value: unknown): string { + if (value === null || typeof value !== "object") return JSON.stringify(value); + if (Array.isArray(value)) return "[" + value.map(canonicalJson).join(",") + "]"; + const obj = value as Record; + const keys = Object.keys(obj).sort(); + return "{" + keys.map((k) => JSON.stringify(k) + ":" + canonicalJson(obj[k])).join(",") + "}"; +} + +function hash(prev: string, meshId: string, eventType: string, actor: string | null, payload: Record, createdAt: Date): string { + const input = `${prev}|${meshId}|${eventType}|${actor}|${canonicalJson(payload)}|${createdAt.toISOString()}`; + return createHash("sha256").update(input).digest("hex"); +} + +describe("audit canonical json hash", () => { + test("key order does not affect the computed hash", () => { + const createdAt = new Date("2026-04-15T12:00:00Z"); + const a = hash("prev", "mesh1", "peer_joined", "actor", { groups: [], pubkey: "abc", restored: true }, createdAt); + const b = hash("prev", "mesh1", "peer_joined", "actor", { restored: true, pubkey: "abc", groups: [] }, createdAt); + const c = hash("prev", "mesh1", "peer_joined", "actor", { pubkey: "abc", groups: [], restored: true }, createdAt); + expect(a).toBe(b); + expect(b).toBe(c); + }); + + test("nested object key order also irrelevant", () => { + const createdAt = new Date("2026-04-15T12:00:00Z"); + const a = hash("x", "m", "e", null, { outer: { inner: { a: 1, b: 2 } } }, createdAt); + const b = hash("x", "m", "e", null, { outer: { inner: { b: 2, a: 1 } } }, createdAt); + expect(a).toBe(b); + }); + + test("array order IS significant", () => { + const createdAt = new Date("2026-04-15T12:00:00Z"); + const a = hash("x", "m", "e", null, { list: [1, 2, 3] }, createdAt); + const b = hash("x", "m", "e", null, { list: [3, 2, 1] }, createdAt); + expect(a).not.toBe(b); + }); + + test("changing payload value changes the hash", () => { + const createdAt = new Date("2026-04-15T12:00:00Z"); + const a = hash("x", "m", "e", null, { k: "v1" }, createdAt); + const b = hash("x", "m", "e", null, { k: "v2" }, createdAt); + expect(a).not.toBe(b); + }); +}); diff --git a/apps/broker/tests/grants-enforcement.test.ts b/apps/broker/tests/grants-enforcement.test.ts new file mode 100644 index 0000000..f773e8e --- /dev/null +++ b/apps/broker/tests/grants-enforcement.test.ts @@ -0,0 +1,66 @@ +/** + * Grant enforcement: the sender+recipient lookup tries member pubkey + * first, then session pubkey (backwards compat for CLI clients that + * stored grants keyed on session key). + * + * This is a pure logic test over the grant map shape — no WS/broker + * needed. The function signature mirrors the branch inside handleSend. + */ + +import { describe, expect, test } from "vitest"; + +const DEFAULT_CAPS = ["read", "dm", "broadcast", "state-read"] as const; + +function allowed( + grants: Record, + senderMemberKey: string, + senderSessionKey: string | null, + capNeeded: "dm" | "broadcast", +): boolean { + const memberEntry = grants[senderMemberKey]; + if (memberEntry !== undefined) return memberEntry.includes(capNeeded); + if (senderSessionKey) { + const sessionEntry = grants[senderSessionKey]; + if (sessionEntry !== undefined) return sessionEntry.includes(capNeeded); + } + return (DEFAULT_CAPS as readonly string[]).includes(capNeeded); +} + +describe("grant enforcement (member-then-session lookup)", () => { + test("no entry → default caps allow dm + broadcast", () => { + expect(allowed({}, "memberK", null, "dm")).toBe(true); + expect(allowed({}, "memberK", null, "broadcast")).toBe(true); + }); + + test("explicit member-key entry wins over default", () => { + const grants = { memberK: ["read"] }; // dm NOT granted + expect(allowed(grants, "memberK", "sessK", "dm")).toBe(false); + }); + + test("empty array for member key = blocked", () => { + const grants = { memberK: [] }; + expect(allowed(grants, "memberK", null, "dm")).toBe(false); + expect(allowed(grants, "memberK", null, "broadcast")).toBe(false); + }); + + test("falls back to session key when member key missing", () => { + const grants = { sessK: ["dm"] }; // grants keyed on session + expect(allowed(grants, "memberK", "sessK", "dm")).toBe(true); + expect(allowed(grants, "memberK", "sessK", "broadcast")).toBe(false); + }); + + test("member entry always wins over session entry", () => { + const grants = { + memberK: [], // member says blocked + sessK: ["dm", "broadcast"], // session says allowed + }; + expect(allowed(grants, "memberK", "sessK", "dm")).toBe(false); + expect(allowed(grants, "memberK", "sessK", "broadcast")).toBe(false); + }); + + test("session fallback only triggers when session key present", () => { + const grants = { sessK: ["dm"] }; + // Without a session key on the caller, falls through to defaults + expect(allowed(grants, "memberK", null, "dm")).toBe(true); + }); +}); diff --git a/docs/env-vars.md b/docs/env-vars.md new file mode 100644 index 0000000..3305d4c --- /dev/null +++ b/docs/env-vars.md @@ -0,0 +1,62 @@ +# claudemesh environment variables + +Reference for every env var the broker and CLI read. + +## Broker (`apps/broker`) + +### Required in production + +| Var | Purpose | +|---|---| +| `DATABASE_URL` | Postgres connection string. Must reach the `mesh` schema. | +| `BROKER_ENCRYPTION_KEY` | 64 hex chars (32 bytes) for AES-256-GCM at-rest encryption of MCP env vars. **Broker refuses to start in production if missing or malformed.** Never log the value. Generate with `openssl rand -hex 32`. | + +### Optional + +| Var | Default | Purpose | +|---|---|---| +| `BROKER_PORT` | `7900` | HTTP/WS listen port. | +| `BROKER_PUBLIC_URL` | `https://ic.claudemesh.com` | Public base for webhook URL generation and similar. | +| `BROKER_WS_URL` | `wss://ic.claudemesh.com/ws` | Public WS URL announced to integrations (Telegram bridge). | +| `APP_URL` | `https://claudemesh.com` | Web-app base for invite short URLs (`/i/`). | +| `EMAIL_FROM` | `noreply@claudemesh.com` | Sender address for Postmark invite emails. | +| `POSTMARK_API_KEY` | — | Postmark server token. Set this or RESEND_API_KEY to enable email invites. | +| `RESEND_API_KEY` | — | Resend API key (alternative to Postmark). | +| `MAX_MESSAGE_BYTES` | `65536` | Hard cap on nonce+ciphertext+targetSpec in a send. | +| `MAX_CONNECTIONS_PER_MESH` | varies | Per-mesh connection cap. | +| `STATUS_TTL_SECONDS` | `60` | How long a presence can stay "working" before being swept back to idle. | +| `HOOK_RATE_LIMIT_PER_MIN` | — | TokenBucket refill rate for `/hook/set-status`. | +| `HOOK_FRESH_WINDOW_SECONDS` | — | How long a hook-set status takes precedence over the JSONL fallback. | +| `MAX_SERVICES_PER_MESH` | varies | Cap on deployed MCP services per mesh. | +| `BROKER_INVITE_V2_ENABLED` | unset (disabled) | Flip to `1` to accept POST /invites/:code/claim. **Broken until the ed25519 binding step lands — see `.artifacts/specs/2026-04-15-invite-v2-cli-migration.md`.** | +| `BROKER_LEGACY_AUTH` | unset (disabled) | Flip to `1` to accept pre-alpha.36 CLIs that send `user_id` in body instead of Bearer. Metered via `broker_legacy_auth_hits_total`; target removal once hits reach ~0. | +| `EXPECTED_MIGRATION` | unset | SHA of the newest applied migration to require on `GET /health/ready`. If set and the DB doesn't contain it, readiness fails → Coolify will not promote the deploy. | +| `NODE_ENV` | — | Setting to `production` enables fail-fast on missing `BROKER_ENCRYPTION_KEY`. | + +## CLI (`apps/cli`) + +| Var | Default | Purpose | +|---|---|---| +| `CLAUDEMESH_BROKER_URL` | `wss://ic.claudemesh.com/ws` | Override the broker WS URL (self-hosters, tests). | +| `CLAUDEMESH_API_URL` | `https://claudemesh.com` | Override the API base URL. | +| `CLAUDEMESH_BROKER_HTTP` | derived from `CLAUDEMESH_BROKER_URL` | Explicit HTTPS base used by `claimInviteV2` — overrides the derivation rule. | +| `CLAUDEMESH_CLAIM_URL` | derived | Explicit URL template for the v2 claim endpoint. `{code}` is substituted. | +| `CLAUDEMESH_CONFIG_DIR` | `~/.claudemesh` | Where `config.json`, `auth.json`, `grants.json`, `peer-cache.json` live. | +| `CLAUDEMESH_DEBUG` | `0` | Flip to `1` to see `[claudemesh]` stderr lines from MCP + WS client. | +| `CLAUDEMESH_DISPLAY_NAME` | hostname | Override for display_name in hello. | +| `CLAUDEMESH_INVITE_V2` | unset | Flip to `1` to prefer the v2 invite claim flow (CLI-side gated — spec pending). | + +## `/install` shell script (`apps/web/src/app/install/route.ts`) + +| Var | Purpose | +|---|---| +| `CLAUDEMESH_DIR` | Installer target dir. Defaults to `$HOME/.claudemesh`. | +| `CLAUDEMESH_BIN` | Shim dir. Defaults to `$HOME/.local/bin`. | + +## Secrets that should NEVER be logged + +- `BROKER_ENCRYPTION_KEY` — AES key; leaking it voids encryption-at-rest +- `POSTMARK_API_KEY`, `RESEND_API_KEY` — email provider tokens +- `auth.session_token` (CLI side) — bearer for all broker calls +- `mesh.owner_secret_key` (broker side) — invite signing key +- `mesh.root_key` (broker side) — symmetric mesh key diff --git a/packaging/docker-compose.ha-local.yml b/packaging/docker-compose.ha-local.yml new file mode 100644 index 0000000..7fbfaf7 --- /dev/null +++ b/packaging/docker-compose.ha-local.yml @@ -0,0 +1,76 @@ +# Local HA smoke-test harness for claudemesh broker. +# +# 2 broker replicas behind Traefik with sticky sessions, single Postgres. +# Boot with: +# docker compose -f packaging/docker-compose.ha-local.yml up --build +# +# Then: +# claudemesh launch --name A --join --broker ws://localhost/ws +# # kill a container: +# docker compose -f packaging/docker-compose.ha-local.yml kill broker-a +# # observe that sessions reconnect to broker-b automatically +# +# Known gaps (see .artifacts/specs/2026-04-15-broker-ha-statelessness-audit.md): +# - streamSubscriptions are per-node (pub on A, sub on B won't work) +# - audit hash chain may fork under concurrent writes +# - meshClocks may double-fire if both nodes think they own a clock + +services: + db: + image: postgres:16-alpine + environment: + POSTGRES_USER: claudemesh + POSTGRES_PASSWORD: ha_smoke_test + POSTGRES_DB: claudemesh + volumes: + - ha-pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U claudemesh"] + interval: 2s + timeout: 3s + retries: 10 + + broker-a: &broker-template + build: + context: ../ + dockerfile: apps/broker/Dockerfile + environment: + NODE_ENV: production + DATABASE_URL: postgres://claudemesh:ha_smoke_test@db:5432/claudemesh + BROKER_PORT: 7900 + BROKER_ENCRYPTION_KEY: "0000000000000000000000000000000000000000000000000000000000000000" + BROKER_LEGACY_AUTH: "1" + BROKER_PUBLIC_URL: http://localhost + BROKER_WS_URL: ws://localhost/ws + MAX_CONNECTIONS_PER_MESH: "200" + depends_on: + db: + condition: service_healthy + labels: + - "traefik.enable=true" + - "traefik.http.routers.broker.rule=Host(`localhost`) || PathPrefix(`/`)" + - "traefik.http.services.broker.loadbalancer.sticky.cookie=true" + - "traefik.http.services.broker.loadbalancer.sticky.cookie.name=cm_node" + - "traefik.http.services.broker.loadbalancer.server.port=7900" + + broker-b: + <<: *broker-template + + traefik: + image: traefik:v3.0 + command: + - --providers.docker=true + - --providers.docker.exposedbydefault=false + - --entrypoints.web.address=:80 + - --api.insecure=true + ports: + - "80:80" + - "8080:8080" # Traefik dashboard + volumes: + - /var/run/docker.sock:/var/run/docker.sock:ro + depends_on: + - broker-a + - broker-b + +volumes: + ha-pgdata: diff --git a/packaging/grafana/claudemesh-broker.json b/packaging/grafana/claudemesh-broker.json new file mode 100644 index 0000000..2606a37 --- /dev/null +++ b/packaging/grafana/claudemesh-broker.json @@ -0,0 +1,63 @@ +{ + "annotations": { "list": [] }, + "schemaVersion": 38, + "title": "claudemesh broker", + "uid": "claudemesh-broker", + "tags": ["claudemesh"], + "timezone": "browser", + "refresh": "30s", + "panels": [ + { + "type": "stat", + "title": "Active connections", + "gridPos": { "x": 0, "y": 0, "w": 6, "h": 4 }, + "targets": [{ "expr": "broker_connections_active" }] + }, + { + "type": "stat", + "title": "Queue depth", + "gridPos": { "x": 6, "y": 0, "w": 6, "h": 4 }, + "targets": [{ "expr": "broker_queue_depth" }] + }, + { + "type": "stat", + "title": "Legacy auth hits (24h)", + "description": "Pre-alpha.36 clients still sending body.user_id. Should trend to 0.", + "gridPos": { "x": 12, "y": 0, "w": 6, "h": 4 }, + "targets": [{ "expr": "increase(broker_legacy_auth_hits_total[24h])" }] + }, + { + "type": "timeseries", + "title": "Messages routed / sec", + "gridPos": { "x": 0, "y": 4, "w": 12, "h": 6 }, + "targets": [ + { "expr": "sum(rate(broker_messages_routed_total[1m])) by (priority)", "legendFormat": "{{priority}}" } + ] + }, + { + "type": "timeseries", + "title": "Messages rejected / sec", + "gridPos": { "x": 12, "y": 4, "w": 12, "h": 6 }, + "targets": [ + { "expr": "sum(rate(broker_messages_rejected_total[1m])) by (reason)", "legendFormat": "{{reason}}" } + ] + }, + { + "type": "timeseries", + "title": "Messages dropped by grant / sec", + "description": "Non-zero means recipient blocked sender. Watch for spikes (abuse signal).", + "gridPos": { "x": 0, "y": 10, "w": 12, "h": 6 }, + "targets": [ + { "expr": "sum(rate(broker_messages_dropped_by_grant_total[1m])) by (cap)", "legendFormat": "{{cap}}" } + ] + }, + { + "type": "timeseries", + "title": "Connections rejected / sec", + "gridPos": { "x": 12, "y": 10, "w": 12, "h": 6 }, + "targets": [ + { "expr": "sum(rate(broker_connections_rejected[1m])) by (reason)", "legendFormat": "{{reason}}" } + ] + } + ] +}