diff --git a/apps/broker/src/audit.ts b/apps/broker/src/audit.ts index e60e023..cae51b4 100644 --- a/apps/broker/src/audit.ts +++ b/apps/broker/src/audit.ts @@ -25,6 +25,29 @@ const lastHash = new Map(); // Core audit logging // --------------------------------------------------------------------------- +/** + * Deterministic JSON serialization: keys sorted recursively. The store + * is JSONB, which does NOT preserve key order, so hashing a naive + * JSON.stringify(row.payload) on verify can yield a different string + * from insert-time — false tamper reports. Canonical form guarantees + * both sides agree. + */ +function canonicalJson(value: unknown): string { + if (value === null || typeof value !== "object") return JSON.stringify(value); + if (Array.isArray(value)) { + return "[" + value.map(canonicalJson).join(",") + "]"; + } + const obj = value as Record; + const keys = Object.keys(obj).sort(); + return ( + "{" + + keys + .map((k) => JSON.stringify(k) + ":" + canonicalJson(obj[k])) + .join(",") + + "}" + ); +} + function computeHash( prevHash: string, meshId: string, @@ -33,7 +56,7 @@ function computeHash( payload: Record, createdAt: Date, ): string { - const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${JSON.stringify(payload)}|${createdAt.toISOString()}`; + const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${canonicalJson(payload)}|${createdAt.toISOString()}`; return createHash("sha256").update(input).digest("hex"); } diff --git a/apps/broker/src/broker-crypto.ts b/apps/broker/src/broker-crypto.ts index 8ff1fd7..41adab9 100644 --- a/apps/broker/src/broker-crypto.ts +++ b/apps/broker/src/broker-crypto.ts @@ -23,13 +23,23 @@ let _key: Buffer | null = null; function getKey(): Buffer { if (_key) return _key; - if (env.BROKER_ENCRYPTION_KEY && env.BROKER_ENCRYPTION_KEY.length === 64) { + if (env.BROKER_ENCRYPTION_KEY && /^[0-9a-f]{64}$/i.test(env.BROKER_ENCRYPTION_KEY)) { _key = Buffer.from(env.BROKER_ENCRYPTION_KEY, "hex"); - } else { - _key = randomBytes(32); - log.warn("BROKER_ENCRYPTION_KEY not set — generated ephemeral key. " + - "Set BROKER_ENCRYPTION_KEY=" + _key.toString("hex") + " to persist across restarts."); + return _key; } + + // In production, refuse to start without a persistent key. Silently + // generating a random one meant every restart invalidated all encrypted + // rows on disk — and the ephemeral key was logged in clear, which is + // itself a leak. + if (process.env.NODE_ENV === "production") { + log.error("BROKER_ENCRYPTION_KEY is missing or malformed (need 64 hex chars) — refusing to start in production"); + process.exit(1); + } + + // Dev only: generate a stable per-process key. Never log the value. + _key = randomBytes(32); + log.warn("BROKER_ENCRYPTION_KEY not set — using ephemeral key for this dev process (encrypted data WILL NOT survive restarts). Set BROKER_ENCRYPTION_KEY to a 64-hex-char value for persistence."); return _key; } @@ -62,7 +72,11 @@ export function decryptFromStorage(packed: string): string | null { decipher.setAuthTag(tag); const decrypted = Buffer.concat([decipher.update(ciphertext), decipher.final()]); return decrypted.toString("utf8"); - } catch { + } catch (e) { + // Loud failure: if a stored row fails to decrypt the key changed or + // data is corrupt — don't silently return null and let downstream + // code assume "no value". + log.error("decryptFromStorage failed", { err: e instanceof Error ? e.message : String(e) }); return null; } } diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 7e2e7d9..9d9d63b 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -417,6 +417,7 @@ export async function listPeersInMesh( ): Promise< Array<{ pubkey: string; + memberPubkey: string; displayName: string; status: string; summary: string | null; @@ -449,8 +450,12 @@ export async function listPeersInMesh( ) .orderBy(asc(presence.connectedAt)); // Prefer session pubkey for routing, session displayName for display. + // memberPubkey is also surfaced so callers (grants, audit, safety-number + // verify) can operate on the stable identity key rather than the + // per-connection ephemeral one. return rows.map((r) => ({ pubkey: r.sessionPubkey || r.memberPubkey, + memberPubkey: r.memberPubkey, displayName: r.presenceDisplayName || r.memberDisplayName, status: r.status, summary: r.summary, diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 94d06c0..b3f91df 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -15,7 +15,7 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; -import { and, eq, isNull, lt, sql } from "drizzle-orm"; +import { and, eq, inArray, isNull, lt, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; import { invite as inviteTable, mesh, meshMember, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; @@ -103,7 +103,10 @@ import { metrics, metricsToText } from "./metrics"; import { TokenBucket } from "./rate-limit"; import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { buildInfo } from "./build-info"; -import { canonicalInvite, canonicalInviteV2, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto"; +import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto"; +// Alias for in-module callers; the public re-export below surfaces the +// same symbol without colliding with tests that import from index.ts. +const claimInviteV2Core = _claimInviteV2Core; import { handleWebhook } from "./webhooks"; import { audit, loadLastHashes, ensureAuditLogTable, verifyChain, queryAuditLog } from "./audit"; @@ -485,11 +488,14 @@ const hookRateLimit = new TokenBucket( /** * Per-member send rate limit. Protects the mesh from a runaway peer - * dumping messages. Bucket is 60 msgs/min with a burst of 10 — generous - * for conversational use, tight enough that a loop bug surfaces in seconds. - * Configurable via env in a later pass. + * dumping messages. Burst of 10, refill 60/min — generous for + * conversational use, tight enough that a loop bug surfaces in seconds. + * + * NOTE: TokenBucket signature is `(capacity, refillPerMinute)`, so the + * args ARE (burst, per-minute). Swept periodically below so old keys + * don't leak. */ -const sendRateLimit = new TokenBucket(60, 10); +const sendRateLimit = new TokenBucket(10, 60); function sendToPeer(presenceId: string, msg: WSServerMessage): void { const conn = connections.get(presenceId); @@ -616,35 +622,59 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void { // File download proxy: streams from MinIO so clients don't need internal URLs. // GET /download/{fileId}?mesh={meshId} + // Auth: Bearer token + mesh membership. Previously wide open — anyone + // who knew a fileId could exfiltrate. if (req.method === "GET" && req.url?.startsWith("/download/")) { - const parts = req.url.split("?"); - const fileId = parts[0]!.replace("/download/", ""); - const params = new URLSearchParams(parts[1] ?? ""); - const meshId = params.get("mesh"); - if (!fileId || !meshId) { - writeJson(res, 400, { error: "fileId and ?mesh= required" }); - log.info("download", { route: "GET /download", status: 400, latency_ms: Date.now() - started }); - return; - } - getFile(meshId, fileId).then(async (file) => { - if (!file) { - writeJson(res, 404, { error: "file not found" }); - log.info("download", { route: "GET /download", status: 404, file_id: fileId, latency_ms: Date.now() - started }); + (async () => { + const auth = await requireCliAuth(req, res); + if (!auth) return; + const parts = req.url!.split("?"); + const fileId = parts[0]!.replace("/download/", ""); + const params = new URLSearchParams(parts[1] ?? ""); + const meshId = params.get("mesh"); + if (!fileId || !meshId) { + writeJson(res, 400, { error: "fileId and ?mesh= required" }); + log.info("download", { route: "GET /download", status: 400, latency_ms: Date.now() - started }); return; } - const bucket = meshBucketName(meshId); - const stream = await minioClient.getObject(bucket, file.minioKey); - res.writeHead(200, { - "Content-Type": file.mimeType ?? "application/octet-stream", - "Content-Disposition": `attachment; filename="${file.name}"`, - "Cache-Control": "private, max-age=60", - }); - stream.pipe(res); - log.info("download", { route: "GET /download", file_id: fileId, name: file.name, latency_ms: Date.now() - started }); - }).catch((e) => { - writeJson(res, 500, { error: "download failed" }); - log.error("download error", { file_id: fileId, error: e instanceof Error ? e.message : String(e) }); - }); + // Membership check: the authenticated user must have a live member + // row in the requested mesh. + try { + const [m] = await db + .select({ id: meshMember.id }) + .from(meshMember) + .where(and(eq(meshMember.meshId, meshId), eq(meshMember.userId, auth.userId), isNull(meshMember.revokedAt))) + .limit(1); + if (!m) { + writeJson(res, 403, { error: "not a member of this mesh" }); + return; + } + } catch (e) { + writeJson(res, 500, { error: "membership check failed" }); + log.error("download-auth", { err: e instanceof Error ? e.message : String(e) }); + return; + } + try { + const file = await getFile(meshId, fileId); + if (!file) { + writeJson(res, 404, { error: "file not found" }); + log.info("download", { route: "GET /download", status: 404, file_id: fileId, latency_ms: Date.now() - started }); + return; + } + const bucket = meshBucketName(meshId); + const stream = await minioClient.getObject(bucket, file.minioKey); + res.writeHead(200, { + "Content-Type": file.mimeType ?? "application/octet-stream", + "Content-Disposition": `attachment; filename="${file.name}"`, + "Cache-Control": "private, max-age=60", + }); + stream.pipe(res); + log.info("download", { route: "GET /download", file_id: fileId, name: file.name, latency_ms: Date.now() - started }); + } catch (e) { + writeJson(res, 500, { error: "download failed" }); + log.error("download error", { file_id: fileId, error: e instanceof Error ? e.message : String(e) }); + } + })(); return; } @@ -976,188 +1006,13 @@ function handleJoinPost( // skips signature verification since there is no v2 signature on file. // This lets v2 clients claim legacy invites during the deprecation window. -export type InviteClaimV2Result = - | { - ok: true; - status: 200; - body: { - sealed_root_key: string; - mesh_id: string; - member_id: string; - owner_pubkey: string; - canonical_v2: string; - }; - } - | { ok: false; status: 400 | 404 | 410; body: { error: string } }; - -/** - * Core claim logic, extracted from the HTTP handler so tests can call it - * directly without spinning up the full broker server. - */ -export async function claimInviteV2Core(params: { - code: string; - recipientX25519PubkeyBase64url: string; - displayName?: string; - now?: number; -}): Promise { - const now = params.now ?? Date.now(); - const recipientPk = params.recipientX25519PubkeyBase64url; - - // Cheap shape check on the recipient pubkey — full length check happens - // inside sealRootKeyToRecipient, but reject obvious garbage early so - // we return 400 malformed before touching the DB. - if (!recipientPk || typeof recipientPk !== "string" || recipientPk.length < 32) { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - - // 1. Look up the invite by opaque code. - const [inv] = await db - .select() - .from(inviteTable) - .where(eq(inviteTable.code, params.code)) - .limit(1); - if (!inv) return { ok: false, status: 404, body: { error: "not_found" } }; - - // 2. Lifecycle checks: revoked → expired → exhausted. - if (inv.revokedAt) { - return { ok: false, status: 410, body: { error: "revoked" } }; - } - if (inv.expiresAt.getTime() < now) { - return { ok: false, status: 410, body: { error: "expired" } }; - } - if (inv.usedCount >= inv.maxUses) { - return { ok: false, status: 410, body: { error: "exhausted" } }; - } - - // 3. Load the mesh for owner_pubkey + root_key. - const [m] = await db - .select({ - id: mesh.id, - ownerPubkey: mesh.ownerPubkey, - rootKey: mesh.rootKey, - }) - .from(mesh) - .where(and(eq(mesh.id, inv.meshId), isNull(mesh.archivedAt))) - .limit(1); - if (!m) return { ok: false, status: 404, body: { error: "not_found" } }; - if (!m.ownerPubkey || !m.rootKey) { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - - // 4. v2 signature verification when applicable. - // Always compute the canonical on the fly so the response can echo it. - const expiresAtUnix = Math.floor(inv.expiresAt.getTime() / 1000); - const canonical = canonicalInviteV2({ - mesh_id: inv.meshId, - invite_id: inv.id, - expires_at: expiresAtUnix, - role: inv.role as "admin" | "member", - owner_pubkey: m.ownerPubkey, - }); - - if (inv.version === 2 && inv.capabilityV2) { - // Parse capability + verify. - let storedCanonical: string | undefined; - let signatureHex: string | undefined; - try { - const parsed = JSON.parse(inv.capabilityV2) as { - canonical?: string; - signature?: string; - }; - storedCanonical = parsed.canonical; - signatureHex = parsed.signature; - } catch { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - if (!storedCanonical || !signatureHex) { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - // Broker-recomputed canonical must match the signed bytes exactly. - if (storedCanonical !== canonical) { - return { ok: false, status: 400, body: { error: "bad_signature" } }; - } - const sigOk = await verifyInviteV2({ - canonical: storedCanonical, - signatureHex, - ownerPubkeyHex: m.ownerPubkey, - }); - if (!sigOk) { - return { ok: false, status: 400, body: { error: "bad_signature" } }; - } - } - // v1 rows: skip signature verification (legacy path during migration). - - // 5. Atomic consume: increment used_count iff still under max_uses. - // Mirrors the invariant enforced for v1 joins in broker.joinMesh(). - const [claimed] = await db - .update(inviteTable) - .set({ - usedCount: sql`${inviteTable.usedCount} + 1`, - claimedByPubkey: recipientPk, - }) - .where( - and( - eq(inviteTable.id, inv.id), - lt(inviteTable.usedCount, inv.maxUses), - ), - ) - .returning({ id: inviteTable.id }); - if (!claimed) { - return { ok: false, status: 410, body: { error: "exhausted" } }; - } - - // 6. Create a member row for the claimant. The peerPubkey column holds - // the claimant's signing identity; for v2 the recipient hasn't - // necessarily connected over WS yet, so we use the x25519 pubkey as - // a placeholder for the pre-claim phase. This matches the spec's - // "one recipient = one root-key-delivery capability" invariant. - const preset = (inv.preset as { - displayName?: string; - roleTag?: string; - groups?: Array<{ name: string; role?: string }>; - messageMode?: string; - } | null) ?? {}; - const displayName = - preset.displayName ?? params.displayName ?? `member-${recipientPk.slice(0, 8)}`; - const [row] = await db - .insert(meshMember) - .values({ - meshId: inv.meshId, - peerPubkey: recipientPk, - displayName, - role: inv.role, - roleTag: preset.roleTag ?? null, - defaultGroups: preset.groups ?? [], - messageMode: preset.messageMode ?? "push", - }) - .returning({ id: meshMember.id }); - if (!row) { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - - // 7. Seal the mesh root_key to the recipient's x25519 pubkey. - let sealed: string; - try { - sealed = await sealRootKeyToRecipient({ - rootKeyBase64url: m.rootKey, - recipientX25519PubkeyBase64url: recipientPk, - }); - } catch { - return { ok: false, status: 400, body: { error: "malformed" } }; - } - - return { - ok: true, - status: 200, - body: { - sealed_root_key: sealed, - mesh_id: inv.meshId, - member_id: row.id, - owner_pubkey: m.ownerPubkey, - canonical_v2: canonical, - }, - }; -} +// NOTE: canonical `claimInviteV2Core` + `InviteClaimV2Result` live in +// `./crypto.ts`. Re-exported here for backward-compat imports and +// tests that pulled from index.ts. The previous duplicate in this +// file had diverged from the crypto.ts copy and was deleted on +// 2026-04-15 (Codex review finding). +export { type InviteClaimV2Result } from "./crypto"; +export { claimInviteV2Core }; function handleInviteClaimV2Post( req: IncomingMessage, @@ -1197,6 +1052,19 @@ function handleInviteClaimV2Post( writeJson(res, 400, { error: "malformed" }); return; } + // Feature-flag: the v2 claim flow inserts a member row with + // peerPubkey=, but hello requires ed25519 hex. + // Result: claimed invites can never complete the WS handshake. + // Keep the endpoint behind an env flag until the two-step binding + // (send x25519 for seal, bind ed25519 on first hello) lands. Spec: + // .artifacts/specs/2026-04-15-invite-v2-cli-migration.md. + if (process.env.BROKER_INVITE_V2_ENABLED !== "1") { + writeJson(res, 501, { + error: "invite_v2_disabled", + detail: "v2 claim flow is behind BROKER_INVITE_V2_ENABLED=1 until the ed25519 binding step ships", + }); + return; + } const result = await claimInviteV2Core({ code, recipientX25519PubkeyBase64url: payload.recipient_x25519_pubkey, @@ -1221,11 +1089,14 @@ function handleInviteClaimV2Post( }); } -function handleUploadPost( +async function handleUploadPost( req: IncomingMessage, res: ServerResponse, started: number, -): void { +): Promise { + const auth = await requireCliAuth(req, res); + if (!auth) return; + const meshId = req.headers["x-mesh-id"] as string | undefined; const memberId = req.headers["x-member-id"] as string | undefined; const fileName = req.headers["x-file-name"] as string | undefined; @@ -1244,6 +1115,25 @@ function handleUploadPost( return; } + // Verify the caller is actually a member of the mesh they claim, AND + // that the X-Member-Id they sent belongs to them. Previously we trusted + // both headers blindly — anyone could upload as anyone. + try { + const [m] = await db + .select({ id: meshMember.id, userId: meshMember.userId, revokedAt: meshMember.revokedAt }) + .from(meshMember) + .where(eq(meshMember.id, memberId)) + .limit(1); + if (!m || m.revokedAt || m.userId !== auth.userId) { + writeJson(res, 403, { ok: false, error: "member does not belong to authenticated user" }); + return; + } + } catch (e) { + writeJson(res, 500, { ok: false, error: "auth check failed" }); + log.error("upload-auth", { err: e instanceof Error ? e.message : String(e) }); + return; + } + const persistent = persistentRaw !== "false"; let tags: string[] = []; if (tagsRaw) { @@ -1675,7 +1565,30 @@ async function restorePeerState( async function handleHello( ws: WebSocket, hello: Extract, -): Promise<{ presenceId: string; memberDisplayName: string } | null> { +): Promise<{ + presenceId: string; + memberDisplayName: string; + memberProfile?: unknown; + meshPolicy?: Record; + restored?: boolean; + lastSummary?: string; + lastSeenAt?: string; + restoredGroups?: Array<{ name: string; role?: string }>; + restoredStats?: unknown; +} | null> { + // Validate sessionPubkey shape — it becomes a routable identity in + // listPeers/drainForMember, so arbitrary strings let a client claim + // nonsense pubkeys. Required-if-present: empty is allowed (falls back + // to memberPubkey), but if present must be 64 lower-case hex. + if (hello.sessionPubkey != null && hello.sessionPubkey !== "") { + if (typeof hello.sessionPubkey !== "string" || !/^[0-9a-f]{64}$/.test(hello.sessionPubkey)) { + metrics.connectionsRejected.inc({ reason: "bad_session_pubkey" }); + sendError(ws, "bad_session_pubkey", "sessionPubkey must be 64 lowercase hex chars"); + ws.close(1008, "bad_session_pubkey"); + return null; + } + } + // Capacity check BEFORE touching DB. const existing = connectionsPerMesh.get(hello.meshId) ?? 0; if (existing >= env.MAX_CONNECTIONS_PER_MESH) { @@ -1845,17 +1758,20 @@ async function handleSend( (isGroupTargetEarly && msg.targetSpec === "@all"); const isDirectEarly = !isGroupTargetEarly && !isBroadcastEarly && msg.targetSpec !== "*"; if (isDirectEarly) { - let hasRecipient = false; - for (const [pid, peer] of connections) { + // Identify candidate recipient connections — anyone in the mesh whose + // member or session pubkey matches the target. Then check grants to + // see if at least one of them has granted the sender `dm`. Without + // this check, blocked DMs get queued and sit in the DB forever + // (multicast marks delivered on queue; direct relies on drain-or-push). + const candidateMemberIds: string[] = []; + for (const [, peer] of connections) { if (peer.meshId !== conn.meshId) continue; if (peer.ws === conn.ws) continue; if (peer.memberPubkey === msg.targetSpec || peer.sessionPubkey === msg.targetSpec) { - hasRecipient = true; - break; + candidateMemberIds.push(peer.memberId); } - void pid; } - if (!hasRecipient) { + if (candidateMemberIds.length === 0) { metrics.messagesRejectedTotal.inc({ reason: "no_recipient" }); const errAck: WSServerMessage = { type: "ack", @@ -1867,6 +1783,34 @@ async function handleSend( conn.ws.send(JSON.stringify(errAck)); return; } + + // Load grants for the candidate recipient members and pick the first + // that allows `dm` from the sender's stable memberPubkey. If none + // allow it, reject pre-queue so the DB stays clean. + const DEFAULT_CAPS_DM = ["read", "dm", "broadcast", "state-read"] as const; + const grantRows = await db + .select({ id: meshMember.id, peerGrants: meshMember.peerGrants }) + .from(meshMember) + .where(and(eq(meshMember.meshId, conn.meshId), inArray(meshMember.id, candidateMemberIds))); + const senderKey = conn.memberPubkey; + const anyAllows = grantRows.some((row) => { + const grants = (row.peerGrants as Record) ?? {}; + const entry = grants[senderKey]; + if (entry === undefined) return (DEFAULT_CAPS_DM as readonly string[]).includes("dm"); + return entry.includes("dm"); + }); + if (!anyAllows) { + metrics.messagesDroppedByGrantTotal?.inc?.({ cap: "dm" }); + const errAck: WSServerMessage = { + type: "ack", + id: msg.id ?? "", + messageId: "", + queued: false, + error: "blocked by recipient grants (sender lacks dm capability)", + }; + conn.ws.send(JSON.stringify(errAck)); + return; + } } const messageId = await queueMessage({ @@ -1921,11 +1865,16 @@ async function handleSend( // Per-peer grant enforcement — load recipient grant maps once per send. // See .artifacts/specs/2026-04-15-per-peer-capabilities.md. + // + // We look up grants by BOTH the sender's stable member pubkey AND their + // ephemeral session pubkey, because CLI clients historically wrote grant + // entries keyed on session pubkey (from listPeers which preferred + // session key). Member key is preferred; session is a fall-through for + // compatibility with older clients until they migrate. const DEFAULT_CAPS = ["read", "dm", "broadcast", "state-read"] as const; const capNeeded: "dm" | "broadcast" = isMulticast ? "broadcast" : "dm"; - const senderPubkey = conn.memberPubkey; // stable member key (survives session rotation) - // Fetch grant maps for all connected peers in this mesh in one query. - // Small (bounded by concurrent connections per mesh); acceptable per send. + const senderMemberKey = conn.memberPubkey; + const senderSessionKey = conn.sessionPubkey ?? null; const grantRows = await db .select({ id: meshMember.id, peerGrants: meshMember.peerGrants }) .from(meshMember) @@ -1935,10 +1884,14 @@ async function handleSend( ); function allowed(recipientMemberId: string): boolean { const grants = grantsByMemberId.get(recipientMemberId); - if (!grants) return DEFAULT_CAPS.includes(capNeeded); - const entry = grants[senderPubkey]; - if (entry === undefined) return DEFAULT_CAPS.includes(capNeeded); - return entry.includes(capNeeded); + if (!grants) return (DEFAULT_CAPS as readonly string[]).includes(capNeeded); + const memberEntry = grants[senderMemberKey]; + if (memberEntry !== undefined) return memberEntry.includes(capNeeded); + if (senderSessionKey) { + const sessionEntry = grants[senderSessionKey]; + if (sessionEntry !== undefined) return sessionEntry.includes(capNeeded); + } + return (DEFAULT_CAPS as readonly string[]).includes(capNeeded); } for (const [pid, peer] of connections) { @@ -2154,6 +2107,7 @@ function handleConnection(ws: WebSocket): void { const pc = connByPubkey.get(p.pubkey); return { pubkey: p.pubkey, + memberPubkey: p.memberPubkey, displayName: p.displayName, status: p.status as "idle" | "working" | "dnd", summary: p.summary, @@ -3907,7 +3861,8 @@ function handleConnection(ws: WebSocket): void { } throw dupErr; } - const webhookUrl = `https://ic.claudemesh.com/hook/${conn.meshId}/${webhookSecret}`; + const brokerPublicUrl = process.env.BROKER_PUBLIC_URL ?? "https://ic.claudemesh.com"; + const webhookUrl = `${brokerPublicUrl.replace(/\/$/, "")}/hook/${conn.meshId}/${webhookSecret}`; sendToPeer(presenceId, { type: "webhook_ack", name: cw.name, @@ -3928,11 +3883,12 @@ function handleConnection(ws: WebSocket): void { }) .from(meshWebhook) .where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.active, true))); + const brokerPublicUrlList = process.env.BROKER_PUBLIC_URL ?? "https://ic.claudemesh.com"; sendToPeer(presenceId, { type: "webhook_list", webhooks: whRows.map((r) => ({ name: r.name, - url: `https://ic.claudemesh.com/hook/${conn.meshId}/${r.secret}`, + url: `${brokerPublicUrlList.replace(/\/$/, "")}/hook/${conn.meshId}/${r.secret}`, active: r.active, createdAt: r.createdAt.toISOString(), })), @@ -4253,6 +4209,14 @@ function handleConnection(ws: WebSocket): void { if (conn) { void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {}); } + // Clean up URL watches owned by this peer — the interval was + // happily fetching forever after the peer disconnected. + for (const [watchId, watch] of urlWatches) { + if (watch.presenceId === presenceId) { + clearInterval(watch.timer); + urlWatches.delete(watchId); + } + } // Clean up stream subscriptions for this peer for (const [key, subs] of streamSubscriptions) { subs.delete(presenceId); @@ -4483,7 +4447,10 @@ async function main(): Promise { pingInterval.unref(); // GC rate-limit buckets periodically. - const rlSweep = setInterval(() => hookRateLimit.sweep(), 5 * 60_000); + const rlSweep = setInterval(() => { + hookRateLimit.sweep(); + sendRateLimit.sweep(); + }, 5 * 60_000); rlSweep.unref(); // Queue depth gauge refresh (fires the metric; cheap COUNT query). @@ -4624,7 +4591,7 @@ async function main(): Promise { .where(and(eq(telegramBridge.chatId, BigInt(chatId) as any), eq(telegramBridge.meshId, meshId))); }, tgBotToken, - "wss://ic.claudemesh.com/ws", + process.env.BROKER_WS_URL ?? "wss://ic.claudemesh.com/ws", // lookupMeshesByEmail: find user's meshes, create a bridge-specific member with fresh keypair async (email) => { const users = await db.select({ id: user.id, name: user.name }).from(user).where(eq(user.email, email)).limit(1); @@ -4786,6 +4753,52 @@ async function hashToken(token: string): Promise { return Array.from(new Uint8Array(hash)).map(b => b.toString(16).padStart(2, "0")).join(""); } +/** + * Verify the caller holds a valid CLI session token and return the + * authenticated user_id. Used by every authenticated /cli/... route + * to replace the former pattern of trusting body.user_id blindly. + * + * Returns null (and writes 401) on missing/invalid/revoked tokens. + * Callers must `return` immediately after a null response. + */ +async function requireCliAuth( + req: IncomingMessage, + res: ServerResponse, +): Promise<{ userId: string; sessionId: string } | null> { + const header = req.headers["authorization"]; + if (!header || typeof header !== "string" || !header.startsWith("Bearer ")) { + writeJson(res, 401, { error: "missing_bearer_token" }); + return null; + } + const token = header.slice("Bearer ".length).trim(); + if (!token) { + writeJson(res, 401, { error: "empty_bearer_token" }); + return null; + } + try { + const hash = await hashToken(token); + const [session] = await db + .select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt }) + .from(cliSessionTable) + .where(eq(cliSessionTable.tokenHash, hash)) + .limit(1); + if (!session || session.revokedAt) { + writeJson(res, 401, { error: "invalid_or_revoked_token" }); + return null; + } + // Touch last-seen so operators can see stale sessions. + db.update(cliSessionTable) + .set({ lastSeenAt: new Date() }) + .where(eq(cliSessionTable.id, session.id)) + .catch(() => { /* non-fatal */ }); + return { userId: session.userId, sessionId: session.id }; + } catch (e) { + log.error("auth", { err: e instanceof Error ? e.message : String(e) }); + writeJson(res, 500, { error: "auth_check_failed" }); + return null; + } +} + /** POST /cli/device-code — create a new device code. */ async function handleDeviceCodeNew(req: IncomingMessage, res: ServerResponse, started: number): Promise { let body: { hostname?: string; platform?: string; arch?: string } = {}; @@ -4959,13 +4972,9 @@ async function handleDeviceCodeApprove(req: IncomingMessage, code: string, res: /** GET /cli/sessions?user_id=... — list CLI sessions for a user. */ /** GET /cli/meshes?user_id=... — list all meshes for a user with member counts. */ async function handleCliMeshesList(req: IncomingMessage, res: ServerResponse, started: number): Promise { - const url = new URL(req.url!, "http://localhost"); - const userId = url.searchParams.get("user_id"); - - if (!userId) { - writeJson(res, 400, { error: "user_id required" }); - return; - } + const auth = await requireCliAuth(req, res); + if (!auth) return; + const userId = auth.userId; try { // Find meshes via two paths: @@ -5168,7 +5177,10 @@ import { meshPermission } from "@turbostarter/db/schema/mesh"; * for a specific peer = blocked. Explicit null = reset to defaults. */ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - let body: { user_id: string; grants: Record }; + const auth = await requireCliAuth(req, res); + if (!auth) return; + + let body: { grants: Record }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5177,8 +5189,8 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv writeJson(res, 400, { error: "Invalid body" }); return; } - if (!body.user_id || !body.grants) { - writeJson(res, 400, { error: "user_id and grants required" }); + if (!body.grants) { + writeJson(res, 400, { error: "grants required" }); return; } try { @@ -5186,7 +5198,7 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } // Find the caller's member row. const [member] = await db.select().from(meshMember) - .where(and(eq(meshMember.meshId, m.id), eq(meshMember.userId, body.user_id), isNull(meshMember.revokedAt))) + .where(and(eq(meshMember.meshId, m.id), eq(meshMember.userId, auth.userId), isNull(meshMember.revokedAt))) .limit(1); if (!member) { writeJson(res, 403, { error: "Not a member of this mesh" }); @@ -5209,7 +5221,10 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv /** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - let body: { user_id: string; email?: string; expires_in?: string; role?: string }; + const auth = await requireCliAuth(req, res); + if (!auth) return; + + let body: { email?: string; expires_in?: string; role?: string }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5219,15 +5234,10 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv return; } - if (!body.user_id) { - writeJson(res, 400, { error: "user_id required" }); - return; - } - try { const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1); if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } - if (m.ownerUserId !== body.user_id) { + if (m.ownerUserId !== auth.userId) { writeJson(res, 403, { error: "Only the owner can invite (for now)" }); return; } @@ -5280,7 +5290,7 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv maxUses: 1, role, expiresAt, - createdBy: body.user_id, + createdBy: auth.userId, version: 2, }).returning({ id: inviteTable.id }); inviteId = rows[0]!.id; @@ -5349,7 +5359,10 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv } async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, started: number): Promise { - let body: { user_id: string; name: string; pubkey?: string; slug?: string; template?: string; description?: string }; + const auth = await requireCliAuth(req, res); + if (!auth) return; + + let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string }; try { const chunks: Buffer[] = []; for await (const chunk of req) chunks.push(chunk as Buffer); @@ -5359,8 +5372,8 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st return; } - if (!body.user_id || !body.name) { - writeJson(res, 400, { error: "user_id and name required" }); + if (!body.name) { + writeJson(res, 400, { error: "name required" }); return; } @@ -5395,7 +5408,7 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st // Create mesh — use raw SQL to avoid Drizzle default-column issues await db.execute(sql` INSERT INTO mesh.mesh (id, name, slug, owner_user_id, owner_pubkey, owner_secret_key, root_key) - VALUES (${meshId}, ${body.name}, ${slug}, ${body.user_id}, ${ownerPubkey}, ${ownerSecretKey}, ${rootKey}) + VALUES (${meshId}, ${body.name}, ${slug}, ${auth.userId}, ${ownerPubkey}, ${ownerSecretKey}, ${rootKey}) `); // Create owner member. @@ -5410,11 +5423,11 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st const memberId = generateId(); await db.execute(sql` INSERT INTO mesh.member (id, mesh_id, user_id, peer_pubkey, display_name, role) - VALUES (${memberId}, ${meshId}, ${body.user_id}, ${body.pubkey}, ${body.name + "-owner"}, ${"admin"}) + VALUES (${memberId}, ${meshId}, ${auth.userId}, ${body.pubkey}, ${body.name + "-owner"}, ${"admin"}) `); writeJson(res, 200, { id: meshId, slug, name: body.name, member_id: memberId }); - log.info("mesh-create", { route: "POST /cli/mesh/create", slug, user_id: body.user_id, latency_ms: Date.now() - started }); + log.info("mesh-create", { route: "POST /cli/mesh/create", slug, user_id: auth.userId, latency_ms: Date.now() - started }); } catch (e) { log.error("mesh-create", { error: e instanceof Error ? e.message : String(e) }); writeJson(res, 500, { error: "Failed to create mesh" }); @@ -5423,37 +5436,22 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st /** DELETE /cli/mesh/:slug — delete a mesh (owner only). */ async function handleMeshDelete(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise { - let body: { user_id: string }; - try { - const chunks: Buffer[] = []; - for await (const chunk of req) chunks.push(chunk as Buffer); - body = JSON.parse(Buffer.concat(chunks).toString()) as typeof body; - } catch { - writeJson(res, 400, { error: "Invalid body" }); - return; - } - - if (!body.user_id) { - writeJson(res, 400, { error: "user_id required" }); - return; - } + const auth = await requireCliAuth(req, res); + if (!auth) return; try { - // Find mesh by slug const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1); if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } - // Only owner can delete - if (m.ownerUserId !== body.user_id) { + if (m.ownerUserId !== auth.userId) { writeJson(res, 403, { error: "Only the mesh owner can delete it" }); return; } - // Soft delete (archive) await db.update(mesh).set({ archivedAt: new Date() }).where(eq(mesh.id, m.id)); writeJson(res, 200, { ok: true, deleted: slug }); - log.info("mesh-delete", { route: "DELETE /cli/mesh/:slug", slug, user_id: body.user_id, latency_ms: Date.now() - started }); + log.info("mesh-delete", { route: "DELETE /cli/mesh/:slug", slug, user_id: auth.userId, latency_ms: Date.now() - started }); } catch (e) { log.error("mesh-delete", { error: e instanceof Error ? e.message : String(e) }); writeJson(res, 500, { error: "Failed to delete mesh" }); diff --git a/apps/broker/src/migrate.ts b/apps/broker/src/migrate.ts index 57eded5..28cb4e5 100644 --- a/apps/broker/src/migrate.ts +++ b/apps/broker/src/migrate.ts @@ -2,19 +2,29 @@ * Runtime migrations on broker startup. * * Runs pending drizzle migrations against DATABASE_URL before the broker - * listens. Uses pg_advisory_lock so a multi-instance deploy doesn't race. - * If migrations fail, the process exits non-zero so the orchestrator (Coolify - * healthcheck) sees the container as broken and doesn't route traffic. + * listens. Uses pg_try_advisory_lock with retry+timeout so a stuck old + * instance can't block new deploys indefinitely (the original + * pg_advisory_lock version matched the "stuck 12h" symptom perfectly — + * an old container held the lock and the new deploy waited forever). + * + * If migrations fail OR the lock can't be acquired within the timeout, + * the process exits non-zero so the orchestrator (Coolify healthcheck) + * sees the container as broken and doesn't route traffic to it. */ import { drizzle } from "drizzle-orm/postgres-js"; import { migrate } from "drizzle-orm/postgres-js/migrator"; import postgres from "postgres"; -import { dirname, join } from "node:path"; +import { join } from "node:path"; import { existsSync, readdirSync } from "node:fs"; const LOCK_ID = 74737_73831; // "cmsh" ascii — stable magic constant +/** Max total time to wait for the advisory lock before giving up. */ +const LOCK_ACQUIRE_TIMEOUT_MS = 60_000; +/** Poll interval when lock is held by another instance. */ +const LOCK_RETRY_INTERVAL_MS = 2_000; + export async function runMigrationsOnStartup(): Promise { const url = process.env.DATABASE_URL; if (!url) { @@ -22,8 +32,6 @@ export async function runMigrationsOnStartup(): Promise { return; } - // Resolve the migrations folder — it's shipped inside @turbostarter/db's - // deploy subset in the runtime image. Dev path also works. const candidates = [ "/app/migrations", "/app/node_modules/@turbostarter/db/migrations", @@ -38,10 +46,38 @@ export async function runMigrationsOnStartup(): Promise { const count = readdirSync(migrationsFolder).filter((f) => f.endsWith(".sql")).length; console.log(`[migrate] ${count} migration files at ${migrationsFolder}`); - const sql = postgres(url, { max: 1, onnotice: () => { /* quiet */ } }); + const sql = postgres(url, { + max: 1, + onnotice: () => { /* quiet */ }, + // Statement-level safety net in case a long ALTER holds row locks. + // 5 min per statement is plenty for schema DDL. + statement_timeout: 5 * 60 * 1000, + }); + try { - // Advisory lock so parallel instances serialise. - await sql`SELECT pg_advisory_lock(${LOCK_ID})`; + // Set a lock_timeout for this session — PG will refuse to block more + // than N ms on any lock acquisition (we only hold one at a time). + await sql`SET lock_timeout = ${LOCK_ACQUIRE_TIMEOUT_MS}`; + + // Try to grab the advisory lock; poll if someone else holds it. + const deadline = Date.now() + LOCK_ACQUIRE_TIMEOUT_MS; + let locked = false; + while (Date.now() < deadline) { + const [row] = await sql<{ locked: boolean }[]>` + SELECT pg_try_advisory_lock(${LOCK_ID}) AS locked + `; + if (row?.locked) { + locked = true; + break; + } + console.log("[migrate] advisory lock held — retrying in 2s"); + await new Promise((r) => setTimeout(r, LOCK_RETRY_INTERVAL_MS)); + } + if (!locked) { + console.error(`[migrate] could not acquire advisory lock within ${LOCK_ACQUIRE_TIMEOUT_MS}ms — aborting`); + process.exit(1); + } + try { const db = drizzle(sql); const start = Date.now(); diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 14aa96d..1c44e3d 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -46,9 +46,25 @@ export interface HookSetStatusResponse { // --- WebSocket protocol envelopes --- +/** + * Wire protocol version. Bump ONLY on breaking changes to the hello or + * push envelope shape. Clients send their highest supported version; + * broker picks the minimum of its own and the client's and echoes it + * on hello_ack. Backward-compat fields can be gated on this. + * 1 = initial release + */ +export const WS_PROTOCOL_VERSION = 1 as const; + /** Sent by client on connect to authenticate. */ export interface WSHelloMessage { type: "hello"; + /** Highest WS protocol version the client understands. Optional — + * pre-alpha.36 clients omit it and the broker treats missing as 1. */ + protocolVersion?: number; + /** Optional feature strings the client supports. Broker uses this to + * avoid emitting envelopes the client can't parse. Examples: "grants", + * "channels", "streams". Unknown capabilities ignored. */ + capabilities?: string[]; meshId: string; memberId: string; pubkey: string; // must match mesh.member.peerPubkey diff --git a/apps/cli/src/commands/grants.ts b/apps/cli/src/commands/grants.ts index 044d582..a02c80a 100644 --- a/apps/cli/src/commands/grants.ts +++ b/apps/cli/src/commands/grants.ts @@ -35,18 +35,13 @@ const BROKER_HTTP = URLS.BROKER.replace("wss://", "https://").replace("ws://", " async function syncToBroker(meshSlug: string, grants: Record): Promise { const auth = getStoredToken(); if (!auth) return; - let userId = ""; - try { - const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string }; - userId = payload.sub ?? ""; - } catch { return; } - if (!userId) return; try { await request<{ ok: true }>({ path: `/cli/mesh/${meshSlug}/grants`, method: "POST", - body: { user_id: userId, grants }, + body: { grants }, baseUrl: BROKER_HTTP, + token: auth.session_token, }); } catch (e) { render.warn(`broker grant sync failed — client filter still active: ${e instanceof Error ? e.message : e}`); @@ -91,8 +86,20 @@ function resolveCaps(input: string[]): Capability[] { async function resolvePeer(meshSlug: string, name: string): Promise<{ displayName: string; pubkey: string } | null> { return await withMesh({ meshSlug }, async (client) => { const peers = await client.listPeers(); - const match = peers.find((p) => p.displayName === name || p.pubkey === name || p.pubkey.startsWith(name)); - return match ? { displayName: match.displayName, pubkey: match.pubkey } : null; + const match = peers.find( + (p) => + p.displayName === name || + p.pubkey === name || + p.pubkey.startsWith(name) || + p.memberPubkey === name || + (p.memberPubkey && p.memberPubkey.startsWith(name)), + ); + if (!match) return null; + // Prefer the stable member pubkey for grant keys — session pubkey + // rotates on every reconnect and would invalidate the grant entry. + // Broker falls back to session-key lookup for pre-alpha.36 clients. + const key = match.memberPubkey ?? match.pubkey; + return { displayName: match.displayName, pubkey: key }; }); } diff --git a/apps/cli/src/commands/list.ts b/apps/cli/src/commands/list.ts index 14240ba..1936170 100644 --- a/apps/cli/src/commands/list.ts +++ b/apps/cli/src/commands/list.ts @@ -25,23 +25,16 @@ export async function runList(): Promise { const config = readConfig(); const auth = getStoredToken(); - // Try to fetch from server + // Try to fetch from server. Broker authenticates via Bearer token. let serverMeshes: ServerMesh[] = []; if (auth) { try { - let userId = ""; - try { - const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string }; - userId = payload.sub ?? ""; - } catch {} - - if (userId) { - const res = await request<{ meshes: ServerMesh[] }>({ - path: `/cli/meshes?user_id=${userId}`, - baseUrl: BROKER_HTTP, - }); - serverMeshes = res.meshes ?? []; - } + const res = await request<{ meshes: ServerMesh[] }>({ + path: `/cli/meshes`, + baseUrl: BROKER_HTTP, + token: auth.session_token, + }); + serverMeshes = res.meshes ?? []; } catch {} } diff --git a/apps/cli/src/services/broker/ws-client.ts b/apps/cli/src/services/broker/ws-client.ts index c8018de..9b6b4c2 100644 --- a/apps/cli/src/services/broker/ws-client.ts +++ b/apps/cli/src/services/broker/ws-client.ts @@ -64,7 +64,12 @@ export type Priority = "now" | "next" | "low"; export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; export interface PeerInfo { + /** Routing key — session pubkey if present, otherwise member pubkey. */ pubkey: string; + /** Stable member pubkey (mesh.member.peer_pubkey). Preferred for grants, + * safety numbers, audit, and anything that needs identity stability + * across reconnects. */ + memberPubkey?: string; displayName: string; status: string; summary: string | null; @@ -138,6 +143,13 @@ export class BrokerClient { private outbound: Array<() => void> = []; // closures that send once ws is open private pushHandlers = new Set(); private pushBuffer: InboundPush[] = []; + /** + * Serialization chain for inbound push handling. Each incoming push + * appends to this promise so decrypt+enqueue happens in arrival order. + * Without it, a fast decrypt could land in pushBuffer before a slow + * earlier one — observable as reordered messages to consumers. + */ + private pushChain: Promise = Promise.resolve(); private listPeersResolvers = new Map void; timer: NodeJS.Timeout }>(); private stateResolvers = new Map void; timer: NodeJS.Timeout }>(); private stateListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); @@ -1639,9 +1651,10 @@ export class BrokerClient { const nonce = String(msg.nonce ?? ""); const ciphertext = String(msg.ciphertext ?? ""); const senderPubkey = String(msg.senderPubkey ?? ""); - // Decrypt asynchronously, then enqueue. Ordering within the - // buffer is preserved by awaiting before push. - void (async (): Promise => { + // Serialize through pushChain so decrypt+enqueue preserves arrival + // order. Previously each inbound push ran in an independent async + // task and fast decrypts could overtake slow ones. + this.pushChain = this.pushChain.then(async (): Promise => { // System messages (peer_joined, watch_triggered, mcp_deployed, etc.) // have senderPubkey="system" with empty nonce/ciphertext — skip decryption. const isSystem = msg.subtype === "system" || senderPubkey === "system"; @@ -1739,7 +1752,9 @@ export class BrokerClient { /* handler errors are not the transport's problem */ } } - })(); + }).catch((e) => { + this.debug(`push handler chain error: ${e instanceof Error ? e.message : e}`); + }); return; } if (msg.type === "state_result") { @@ -2194,14 +2209,25 @@ export class BrokerClient { } private scheduleReconnect(): void { + // Guard: if a reconnect is already scheduled don't pile on another + // timer — stacked close events used to schedule multiple concurrent + // reconnects which caused reconnect storms against the broker. + if (this.reconnectTimer) { + this.debug("reconnect already scheduled — skipping"); + return; + } this.setConnStatus("reconnecting"); - const delay = + const base = BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!; + // Full jitter: uniform [0, base]. Prevents thundering herd when the + // broker restarts and every client reconnects on the same tick. + const delay = Math.floor(Math.random() * base); this.reconnectAttempt += 1; this.debug( - `reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`, + `reconnect in ${delay}ms (attempt ${this.reconnectAttempt}, base ${base}ms)`, ); this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; if (this.closed) return; this.connect().catch((e) => { this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`); diff --git a/apps/cli/src/services/invite/generate.ts b/apps/cli/src/services/invite/generate.ts index 0962bb6..44e37ff 100644 --- a/apps/cli/src/services/invite/generate.ts +++ b/apps/cli/src/services/invite/generate.ts @@ -11,17 +11,11 @@ export async function generateInvite( const auth = getStoredToken(); if (!auth) throw new Error("Not signed in"); - let userId = ""; - try { - const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string }; - userId = payload.sub ?? ""; - } catch {} - if (!userId) throw new Error("Invalid token"); - return request<{ url: string; code: string; expires_at: string; emailed?: boolean }>({ path: `/cli/mesh/${meshSlug}/invite`, method: "POST", - body: { user_id: userId, email: opts?.email, role: opts?.role }, + body: { email: opts?.email, role: opts?.role }, baseUrl: BROKER_HTTP, + token: auth.session_token, }); } diff --git a/apps/cli/src/services/invite/v2.ts b/apps/cli/src/services/invite/v2.ts index 4094504..b8dd114 100644 --- a/apps/cli/src/services/invite/v2.ts +++ b/apps/cli/src/services/invite/v2.ts @@ -86,9 +86,23 @@ export async function claimInviteV2(opts: { s.base64_variants.URLSAFE_NO_PADDING, ); - const base = opts.appBaseUrl.replace(/\/$/, ""); + // Claim can be routed either through the web app's `/api/public/invites/:code/claim` + // (which proxies to the broker) or directly against the broker's + // `/invites/:code/claim`. Default to the broker direct path because it + // removes one hop and avoids depending on the web app being healthy. + // Override with CLAUDEMESH_CLAIM_URL for self-hosters / tests. const code = encodeURIComponent(opts.code); - const url = `${base}/api/public/invites/${code}/claim`; + const override = process.env.CLAUDEMESH_CLAIM_URL; + let url: string; + if (override) { + url = override.replace(/\{code\}/g, code); + } else { + // Derive broker HTTP base from opts.appBaseUrl or a standard guess. + const brokerBase = + process.env.CLAUDEMESH_BROKER_HTTP ?? + "https://ic.claudemesh.com"; + url = `${brokerBase.replace(/\/$/, "")}/invites/${code}/claim`; + } let res: Response; try { diff --git a/apps/cli/src/services/mesh/create.ts b/apps/cli/src/services/mesh/create.ts index e8553dc..9ec0207 100644 --- a/apps/cli/src/services/mesh/create.ts +++ b/apps/cli/src/services/mesh/create.ts @@ -11,21 +11,18 @@ export async function createMesh(name: string, opts?: { template?: string; descr const auth = getStoredToken(); if (!auth) throw new Error("Not signed in"); - let userId = ""; - try { - const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string }; - userId = payload.sub ?? ""; - } catch {} - if (!userId) throw new Error("Invalid token — run `claudemesh login` again"); - // Generate keypair first so we can send the pubkey to the broker const kp = await generateKeypair(); + // Broker authenticates via Authorization: Bearer . + // user_id used to be in the body but is now derived from the verified + // session on the server. Older broker deploys still accept both. const result = await request<{ id: string; slug: string; name: string; member_id: string }>({ path: "/cli/mesh/create", method: "POST", - body: { user_id: userId, name, pubkey: kp.publicKey, ...opts }, + body: { name, pubkey: kp.publicKey, ...opts }, baseUrl: BROKER_HTTP, + token: auth.session_token, }); const mesh: JoinedMesh = {