fix(security): resolve all 17 codex findings — auth, grants, crypto, ops
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Critical: broker HTTP auth via cli_session bearer token on all /cli/*;
file download requires auth+membership; v2 claim gated; duplicate
claimInviteV2Core removed; grant enforcement tries member then
session pubkey; audit hash uses canonical sorted-keys JSON.

High: rate limit args fixed (burst 10, 60/min) + both buckets swept;
BROKER_ENCRYPTION_KEY fail-fast in prod; migrate uses pg_try + lock_
timeout; hello validates sessionPubkey hex; blocked DMs rejected pre-
queue; watch timers cleaned on disconnect.

Medium: inbound pushes serialized; reconnect jitter + timer guard;
hardcoded URLs through env; v2 claim path configurable.

Low: WSHelloMessage optional protocolVersion+capabilities.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-15 19:18:25 +01:00
parent 1a7a059e75
commit 2be5e9dccb
12 changed files with 464 additions and 341 deletions

View File

@@ -25,6 +25,29 @@ const lastHash = new Map<string, string>();
// Core audit logging // Core audit logging
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/**
* Deterministic JSON serialization: keys sorted recursively. The store
* is JSONB, which does NOT preserve key order, so hashing a naive
* JSON.stringify(row.payload) on verify can yield a different string
* from insert-time — false tamper reports. Canonical form guarantees
* both sides agree.
*/
function canonicalJson(value: unknown): string {
if (value === null || typeof value !== "object") return JSON.stringify(value);
if (Array.isArray(value)) {
return "[" + value.map(canonicalJson).join(",") + "]";
}
const obj = value as Record<string, unknown>;
const keys = Object.keys(obj).sort();
return (
"{" +
keys
.map((k) => JSON.stringify(k) + ":" + canonicalJson(obj[k]))
.join(",") +
"}"
);
}
function computeHash( function computeHash(
prevHash: string, prevHash: string,
meshId: string, meshId: string,
@@ -33,7 +56,7 @@ function computeHash(
payload: Record<string, unknown>, payload: Record<string, unknown>,
createdAt: Date, createdAt: Date,
): string { ): string {
const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${JSON.stringify(payload)}|${createdAt.toISOString()}`; const input = `${prevHash}|${meshId}|${eventType}|${actorMemberId}|${canonicalJson(payload)}|${createdAt.toISOString()}`;
return createHash("sha256").update(input).digest("hex"); return createHash("sha256").update(input).digest("hex");
} }

View File

@@ -23,13 +23,23 @@ let _key: Buffer | null = null;
function getKey(): Buffer { function getKey(): Buffer {
if (_key) return _key; if (_key) return _key;
if (env.BROKER_ENCRYPTION_KEY && env.BROKER_ENCRYPTION_KEY.length === 64) { if (env.BROKER_ENCRYPTION_KEY && /^[0-9a-f]{64}$/i.test(env.BROKER_ENCRYPTION_KEY)) {
_key = Buffer.from(env.BROKER_ENCRYPTION_KEY, "hex"); _key = Buffer.from(env.BROKER_ENCRYPTION_KEY, "hex");
} else { return _key;
_key = randomBytes(32);
log.warn("BROKER_ENCRYPTION_KEY not set — generated ephemeral key. " +
"Set BROKER_ENCRYPTION_KEY=" + _key.toString("hex") + " to persist across restarts.");
} }
// In production, refuse to start without a persistent key. Silently
// generating a random one meant every restart invalidated all encrypted
// rows on disk — and the ephemeral key was logged in clear, which is
// itself a leak.
if (process.env.NODE_ENV === "production") {
log.error("BROKER_ENCRYPTION_KEY is missing or malformed (need 64 hex chars) — refusing to start in production");
process.exit(1);
}
// Dev only: generate a stable per-process key. Never log the value.
_key = randomBytes(32);
log.warn("BROKER_ENCRYPTION_KEY not set — using ephemeral key for this dev process (encrypted data WILL NOT survive restarts). Set BROKER_ENCRYPTION_KEY to a 64-hex-char value for persistence.");
return _key; return _key;
} }
@@ -62,7 +72,11 @@ export function decryptFromStorage(packed: string): string | null {
decipher.setAuthTag(tag); decipher.setAuthTag(tag);
const decrypted = Buffer.concat([decipher.update(ciphertext), decipher.final()]); const decrypted = Buffer.concat([decipher.update(ciphertext), decipher.final()]);
return decrypted.toString("utf8"); return decrypted.toString("utf8");
} catch { } catch (e) {
// Loud failure: if a stored row fails to decrypt the key changed or
// data is corrupt — don't silently return null and let downstream
// code assume "no value".
log.error("decryptFromStorage failed", { err: e instanceof Error ? e.message : String(e) });
return null; return null;
} }
} }

View File

@@ -417,6 +417,7 @@ export async function listPeersInMesh(
): Promise< ): Promise<
Array<{ Array<{
pubkey: string; pubkey: string;
memberPubkey: string;
displayName: string; displayName: string;
status: string; status: string;
summary: string | null; summary: string | null;
@@ -449,8 +450,12 @@ export async function listPeersInMesh(
) )
.orderBy(asc(presence.connectedAt)); .orderBy(asc(presence.connectedAt));
// Prefer session pubkey for routing, session displayName for display. // Prefer session pubkey for routing, session displayName for display.
// memberPubkey is also surfaced so callers (grants, audit, safety-number
// verify) can operate on the stable identity key rather than the
// per-connection ephemeral one.
return rows.map((r) => ({ return rows.map((r) => ({
pubkey: r.sessionPubkey || r.memberPubkey, pubkey: r.sessionPubkey || r.memberPubkey,
memberPubkey: r.memberPubkey,
displayName: r.presenceDisplayName || r.memberDisplayName, displayName: r.presenceDisplayName || r.memberDisplayName,
status: r.status, status: r.status,
summary: r.summary, summary: r.summary,

View File

@@ -15,7 +15,7 @@
import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
import type { Duplex } from "node:stream"; import type { Duplex } from "node:stream";
import { WebSocketServer, type WebSocket } from "ws"; import { WebSocketServer, type WebSocket } from "ws";
import { and, eq, isNull, lt, sql } from "drizzle-orm"; import { and, eq, inArray, isNull, lt, sql } from "drizzle-orm";
import { env } from "./env"; import { env } from "./env";
import { db } from "./db"; import { db } from "./db";
import { invite as inviteTable, mesh, meshMember, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; import { invite as inviteTable, mesh, meshMember, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
@@ -103,7 +103,10 @@ import { metrics, metricsToText } from "./metrics";
import { TokenBucket } from "./rate-limit"; import { TokenBucket } from "./rate-limit";
import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health"; import { isDbHealthy, startDbHealth, stopDbHealth } from "./db-health";
import { buildInfo } from "./build-info"; import { buildInfo } from "./build-info";
import { canonicalInvite, canonicalInviteV2, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto"; import { canonicalInvite, canonicalInviteV2, claimInviteV2Core as _claimInviteV2Core, sealRootKeyToRecipient, verifyHelloSignature, verifyInviteV2 } from "./crypto";
// Alias for in-module callers; the public re-export below surfaces the
// same symbol without colliding with tests that import from index.ts.
const claimInviteV2Core = _claimInviteV2Core;
import { handleWebhook } from "./webhooks"; import { handleWebhook } from "./webhooks";
import { audit, loadLastHashes, ensureAuditLogTable, verifyChain, queryAuditLog } from "./audit"; import { audit, loadLastHashes, ensureAuditLogTable, verifyChain, queryAuditLog } from "./audit";
@@ -485,11 +488,14 @@ const hookRateLimit = new TokenBucket(
/** /**
* Per-member send rate limit. Protects the mesh from a runaway peer * Per-member send rate limit. Protects the mesh from a runaway peer
* dumping messages. Bucket is 60 msgs/min with a burst of 10 — generous * dumping messages. Burst of 10, refill 60/min — generous for
* for conversational use, tight enough that a loop bug surfaces in seconds. * conversational use, tight enough that a loop bug surfaces in seconds.
* Configurable via env in a later pass. *
* NOTE: TokenBucket signature is `(capacity, refillPerMinute)`, so the
* args ARE (burst, per-minute). Swept periodically below so old keys
* don't leak.
*/ */
const sendRateLimit = new TokenBucket(60, 10); const sendRateLimit = new TokenBucket(10, 60);
function sendToPeer(presenceId: string, msg: WSServerMessage): void { function sendToPeer(presenceId: string, msg: WSServerMessage): void {
const conn = connections.get(presenceId); const conn = connections.get(presenceId);
@@ -616,35 +622,59 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
// File download proxy: streams from MinIO so clients don't need internal URLs. // File download proxy: streams from MinIO so clients don't need internal URLs.
// GET /download/{fileId}?mesh={meshId} // GET /download/{fileId}?mesh={meshId}
// Auth: Bearer token + mesh membership. Previously wide open — anyone
// who knew a fileId could exfiltrate.
if (req.method === "GET" && req.url?.startsWith("/download/")) { if (req.method === "GET" && req.url?.startsWith("/download/")) {
const parts = req.url.split("?"); (async () => {
const fileId = parts[0]!.replace("/download/", ""); const auth = await requireCliAuth(req, res);
const params = new URLSearchParams(parts[1] ?? ""); if (!auth) return;
const meshId = params.get("mesh"); const parts = req.url!.split("?");
if (!fileId || !meshId) { const fileId = parts[0]!.replace("/download/", "");
writeJson(res, 400, { error: "fileId and ?mesh= required" }); const params = new URLSearchParams(parts[1] ?? "");
log.info("download", { route: "GET /download", status: 400, latency_ms: Date.now() - started }); const meshId = params.get("mesh");
return; if (!fileId || !meshId) {
} writeJson(res, 400, { error: "fileId and ?mesh= required" });
getFile(meshId, fileId).then(async (file) => { log.info("download", { route: "GET /download", status: 400, latency_ms: Date.now() - started });
if (!file) {
writeJson(res, 404, { error: "file not found" });
log.info("download", { route: "GET /download", status: 404, file_id: fileId, latency_ms: Date.now() - started });
return; return;
} }
const bucket = meshBucketName(meshId); // Membership check: the authenticated user must have a live member
const stream = await minioClient.getObject(bucket, file.minioKey); // row in the requested mesh.
res.writeHead(200, { try {
"Content-Type": file.mimeType ?? "application/octet-stream", const [m] = await db
"Content-Disposition": `attachment; filename="${file.name}"`, .select({ id: meshMember.id })
"Cache-Control": "private, max-age=60", .from(meshMember)
}); .where(and(eq(meshMember.meshId, meshId), eq(meshMember.userId, auth.userId), isNull(meshMember.revokedAt)))
stream.pipe(res); .limit(1);
log.info("download", { route: "GET /download", file_id: fileId, name: file.name, latency_ms: Date.now() - started }); if (!m) {
}).catch((e) => { writeJson(res, 403, { error: "not a member of this mesh" });
writeJson(res, 500, { error: "download failed" }); return;
log.error("download error", { file_id: fileId, error: e instanceof Error ? e.message : String(e) }); }
}); } catch (e) {
writeJson(res, 500, { error: "membership check failed" });
log.error("download-auth", { err: e instanceof Error ? e.message : String(e) });
return;
}
try {
const file = await getFile(meshId, fileId);
if (!file) {
writeJson(res, 404, { error: "file not found" });
log.info("download", { route: "GET /download", status: 404, file_id: fileId, latency_ms: Date.now() - started });
return;
}
const bucket = meshBucketName(meshId);
const stream = await minioClient.getObject(bucket, file.minioKey);
res.writeHead(200, {
"Content-Type": file.mimeType ?? "application/octet-stream",
"Content-Disposition": `attachment; filename="${file.name}"`,
"Cache-Control": "private, max-age=60",
});
stream.pipe(res);
log.info("download", { route: "GET /download", file_id: fileId, name: file.name, latency_ms: Date.now() - started });
} catch (e) {
writeJson(res, 500, { error: "download failed" });
log.error("download error", { file_id: fileId, error: e instanceof Error ? e.message : String(e) });
}
})();
return; return;
} }
@@ -976,188 +1006,13 @@ function handleJoinPost(
// skips signature verification since there is no v2 signature on file. // skips signature verification since there is no v2 signature on file.
// This lets v2 clients claim legacy invites during the deprecation window. // This lets v2 clients claim legacy invites during the deprecation window.
export type InviteClaimV2Result = // NOTE: canonical `claimInviteV2Core` + `InviteClaimV2Result` live in
| { // `./crypto.ts`. Re-exported here for backward-compat imports and
ok: true; // tests that pulled from index.ts. The previous duplicate in this
status: 200; // file had diverged from the crypto.ts copy and was deleted on
body: { // 2026-04-15 (Codex review finding).
sealed_root_key: string; export { type InviteClaimV2Result } from "./crypto";
mesh_id: string; export { claimInviteV2Core };
member_id: string;
owner_pubkey: string;
canonical_v2: string;
};
}
| { ok: false; status: 400 | 404 | 410; body: { error: string } };
/**
* Core claim logic, extracted from the HTTP handler so tests can call it
* directly without spinning up the full broker server.
*/
export async function claimInviteV2Core(params: {
code: string;
recipientX25519PubkeyBase64url: string;
displayName?: string;
now?: number;
}): Promise<InviteClaimV2Result> {
const now = params.now ?? Date.now();
const recipientPk = params.recipientX25519PubkeyBase64url;
// Cheap shape check on the recipient pubkey — full length check happens
// inside sealRootKeyToRecipient, but reject obvious garbage early so
// we return 400 malformed before touching the DB.
if (!recipientPk || typeof recipientPk !== "string" || recipientPk.length < 32) {
return { ok: false, status: 400, body: { error: "malformed" } };
}
// 1. Look up the invite by opaque code.
const [inv] = await db
.select()
.from(inviteTable)
.where(eq(inviteTable.code, params.code))
.limit(1);
if (!inv) return { ok: false, status: 404, body: { error: "not_found" } };
// 2. Lifecycle checks: revoked → expired → exhausted.
if (inv.revokedAt) {
return { ok: false, status: 410, body: { error: "revoked" } };
}
if (inv.expiresAt.getTime() < now) {
return { ok: false, status: 410, body: { error: "expired" } };
}
if (inv.usedCount >= inv.maxUses) {
return { ok: false, status: 410, body: { error: "exhausted" } };
}
// 3. Load the mesh for owner_pubkey + root_key.
const [m] = await db
.select({
id: mesh.id,
ownerPubkey: mesh.ownerPubkey,
rootKey: mesh.rootKey,
})
.from(mesh)
.where(and(eq(mesh.id, inv.meshId), isNull(mesh.archivedAt)))
.limit(1);
if (!m) return { ok: false, status: 404, body: { error: "not_found" } };
if (!m.ownerPubkey || !m.rootKey) {
return { ok: false, status: 400, body: { error: "malformed" } };
}
// 4. v2 signature verification when applicable.
// Always compute the canonical on the fly so the response can echo it.
const expiresAtUnix = Math.floor(inv.expiresAt.getTime() / 1000);
const canonical = canonicalInviteV2({
mesh_id: inv.meshId,
invite_id: inv.id,
expires_at: expiresAtUnix,
role: inv.role as "admin" | "member",
owner_pubkey: m.ownerPubkey,
});
if (inv.version === 2 && inv.capabilityV2) {
// Parse capability + verify.
let storedCanonical: string | undefined;
let signatureHex: string | undefined;
try {
const parsed = JSON.parse(inv.capabilityV2) as {
canonical?: string;
signature?: string;
};
storedCanonical = parsed.canonical;
signatureHex = parsed.signature;
} catch {
return { ok: false, status: 400, body: { error: "malformed" } };
}
if (!storedCanonical || !signatureHex) {
return { ok: false, status: 400, body: { error: "malformed" } };
}
// Broker-recomputed canonical must match the signed bytes exactly.
if (storedCanonical !== canonical) {
return { ok: false, status: 400, body: { error: "bad_signature" } };
}
const sigOk = await verifyInviteV2({
canonical: storedCanonical,
signatureHex,
ownerPubkeyHex: m.ownerPubkey,
});
if (!sigOk) {
return { ok: false, status: 400, body: { error: "bad_signature" } };
}
}
// v1 rows: skip signature verification (legacy path during migration).
// 5. Atomic consume: increment used_count iff still under max_uses.
// Mirrors the invariant enforced for v1 joins in broker.joinMesh().
const [claimed] = await db
.update(inviteTable)
.set({
usedCount: sql`${inviteTable.usedCount} + 1`,
claimedByPubkey: recipientPk,
})
.where(
and(
eq(inviteTable.id, inv.id),
lt(inviteTable.usedCount, inv.maxUses),
),
)
.returning({ id: inviteTable.id });
if (!claimed) {
return { ok: false, status: 410, body: { error: "exhausted" } };
}
// 6. Create a member row for the claimant. The peerPubkey column holds
// the claimant's signing identity; for v2 the recipient hasn't
// necessarily connected over WS yet, so we use the x25519 pubkey as
// a placeholder for the pre-claim phase. This matches the spec's
// "one recipient = one root-key-delivery capability" invariant.
const preset = (inv.preset as {
displayName?: string;
roleTag?: string;
groups?: Array<{ name: string; role?: string }>;
messageMode?: string;
} | null) ?? {};
const displayName =
preset.displayName ?? params.displayName ?? `member-${recipientPk.slice(0, 8)}`;
const [row] = await db
.insert(meshMember)
.values({
meshId: inv.meshId,
peerPubkey: recipientPk,
displayName,
role: inv.role,
roleTag: preset.roleTag ?? null,
defaultGroups: preset.groups ?? [],
messageMode: preset.messageMode ?? "push",
})
.returning({ id: meshMember.id });
if (!row) {
return { ok: false, status: 400, body: { error: "malformed" } };
}
// 7. Seal the mesh root_key to the recipient's x25519 pubkey.
let sealed: string;
try {
sealed = await sealRootKeyToRecipient({
rootKeyBase64url: m.rootKey,
recipientX25519PubkeyBase64url: recipientPk,
});
} catch {
return { ok: false, status: 400, body: { error: "malformed" } };
}
return {
ok: true,
status: 200,
body: {
sealed_root_key: sealed,
mesh_id: inv.meshId,
member_id: row.id,
owner_pubkey: m.ownerPubkey,
canonical_v2: canonical,
},
};
}
function handleInviteClaimV2Post( function handleInviteClaimV2Post(
req: IncomingMessage, req: IncomingMessage,
@@ -1197,6 +1052,19 @@ function handleInviteClaimV2Post(
writeJson(res, 400, { error: "malformed" }); writeJson(res, 400, { error: "malformed" });
return; return;
} }
// Feature-flag: the v2 claim flow inserts a member row with
// peerPubkey=<x25519 base64url>, but hello requires ed25519 hex.
// Result: claimed invites can never complete the WS handshake.
// Keep the endpoint behind an env flag until the two-step binding
// (send x25519 for seal, bind ed25519 on first hello) lands. Spec:
// .artifacts/specs/2026-04-15-invite-v2-cli-migration.md.
if (process.env.BROKER_INVITE_V2_ENABLED !== "1") {
writeJson(res, 501, {
error: "invite_v2_disabled",
detail: "v2 claim flow is behind BROKER_INVITE_V2_ENABLED=1 until the ed25519 binding step ships",
});
return;
}
const result = await claimInviteV2Core({ const result = await claimInviteV2Core({
code, code,
recipientX25519PubkeyBase64url: payload.recipient_x25519_pubkey, recipientX25519PubkeyBase64url: payload.recipient_x25519_pubkey,
@@ -1221,11 +1089,14 @@ function handleInviteClaimV2Post(
}); });
} }
function handleUploadPost( async function handleUploadPost(
req: IncomingMessage, req: IncomingMessage,
res: ServerResponse, res: ServerResponse,
started: number, started: number,
): void { ): Promise<void> {
const auth = await requireCliAuth(req, res);
if (!auth) return;
const meshId = req.headers["x-mesh-id"] as string | undefined; const meshId = req.headers["x-mesh-id"] as string | undefined;
const memberId = req.headers["x-member-id"] as string | undefined; const memberId = req.headers["x-member-id"] as string | undefined;
const fileName = req.headers["x-file-name"] as string | undefined; const fileName = req.headers["x-file-name"] as string | undefined;
@@ -1244,6 +1115,25 @@ function handleUploadPost(
return; return;
} }
// Verify the caller is actually a member of the mesh they claim, AND
// that the X-Member-Id they sent belongs to them. Previously we trusted
// both headers blindly — anyone could upload as anyone.
try {
const [m] = await db
.select({ id: meshMember.id, userId: meshMember.userId, revokedAt: meshMember.revokedAt })
.from(meshMember)
.where(eq(meshMember.id, memberId))
.limit(1);
if (!m || m.revokedAt || m.userId !== auth.userId) {
writeJson(res, 403, { ok: false, error: "member does not belong to authenticated user" });
return;
}
} catch (e) {
writeJson(res, 500, { ok: false, error: "auth check failed" });
log.error("upload-auth", { err: e instanceof Error ? e.message : String(e) });
return;
}
const persistent = persistentRaw !== "false"; const persistent = persistentRaw !== "false";
let tags: string[] = []; let tags: string[] = [];
if (tagsRaw) { if (tagsRaw) {
@@ -1675,7 +1565,30 @@ async function restorePeerState(
async function handleHello( async function handleHello(
ws: WebSocket, ws: WebSocket,
hello: Extract<WSClientMessage, { type: "hello" }>, hello: Extract<WSClientMessage, { type: "hello" }>,
): Promise<{ presenceId: string; memberDisplayName: string } | null> { ): Promise<{
presenceId: string;
memberDisplayName: string;
memberProfile?: unknown;
meshPolicy?: Record<string, unknown>;
restored?: boolean;
lastSummary?: string;
lastSeenAt?: string;
restoredGroups?: Array<{ name: string; role?: string }>;
restoredStats?: unknown;
} | null> {
// Validate sessionPubkey shape — it becomes a routable identity in
// listPeers/drainForMember, so arbitrary strings let a client claim
// nonsense pubkeys. Required-if-present: empty is allowed (falls back
// to memberPubkey), but if present must be 64 lower-case hex.
if (hello.sessionPubkey != null && hello.sessionPubkey !== "") {
if (typeof hello.sessionPubkey !== "string" || !/^[0-9a-f]{64}$/.test(hello.sessionPubkey)) {
metrics.connectionsRejected.inc({ reason: "bad_session_pubkey" });
sendError(ws, "bad_session_pubkey", "sessionPubkey must be 64 lowercase hex chars");
ws.close(1008, "bad_session_pubkey");
return null;
}
}
// Capacity check BEFORE touching DB. // Capacity check BEFORE touching DB.
const existing = connectionsPerMesh.get(hello.meshId) ?? 0; const existing = connectionsPerMesh.get(hello.meshId) ?? 0;
if (existing >= env.MAX_CONNECTIONS_PER_MESH) { if (existing >= env.MAX_CONNECTIONS_PER_MESH) {
@@ -1845,17 +1758,20 @@ async function handleSend(
(isGroupTargetEarly && msg.targetSpec === "@all"); (isGroupTargetEarly && msg.targetSpec === "@all");
const isDirectEarly = !isGroupTargetEarly && !isBroadcastEarly && msg.targetSpec !== "*"; const isDirectEarly = !isGroupTargetEarly && !isBroadcastEarly && msg.targetSpec !== "*";
if (isDirectEarly) { if (isDirectEarly) {
let hasRecipient = false; // Identify candidate recipient connections — anyone in the mesh whose
for (const [pid, peer] of connections) { // member or session pubkey matches the target. Then check grants to
// see if at least one of them has granted the sender `dm`. Without
// this check, blocked DMs get queued and sit in the DB forever
// (multicast marks delivered on queue; direct relies on drain-or-push).
const candidateMemberIds: string[] = [];
for (const [, peer] of connections) {
if (peer.meshId !== conn.meshId) continue; if (peer.meshId !== conn.meshId) continue;
if (peer.ws === conn.ws) continue; if (peer.ws === conn.ws) continue;
if (peer.memberPubkey === msg.targetSpec || peer.sessionPubkey === msg.targetSpec) { if (peer.memberPubkey === msg.targetSpec || peer.sessionPubkey === msg.targetSpec) {
hasRecipient = true; candidateMemberIds.push(peer.memberId);
break;
} }
void pid;
} }
if (!hasRecipient) { if (candidateMemberIds.length === 0) {
metrics.messagesRejectedTotal.inc({ reason: "no_recipient" }); metrics.messagesRejectedTotal.inc({ reason: "no_recipient" });
const errAck: WSServerMessage = { const errAck: WSServerMessage = {
type: "ack", type: "ack",
@@ -1867,6 +1783,34 @@ async function handleSend(
conn.ws.send(JSON.stringify(errAck)); conn.ws.send(JSON.stringify(errAck));
return; return;
} }
// Load grants for the candidate recipient members and pick the first
// that allows `dm` from the sender's stable memberPubkey. If none
// allow it, reject pre-queue so the DB stays clean.
const DEFAULT_CAPS_DM = ["read", "dm", "broadcast", "state-read"] as const;
const grantRows = await db
.select({ id: meshMember.id, peerGrants: meshMember.peerGrants })
.from(meshMember)
.where(and(eq(meshMember.meshId, conn.meshId), inArray(meshMember.id, candidateMemberIds)));
const senderKey = conn.memberPubkey;
const anyAllows = grantRows.some((row) => {
const grants = (row.peerGrants as Record<string, string[]>) ?? {};
const entry = grants[senderKey];
if (entry === undefined) return (DEFAULT_CAPS_DM as readonly string[]).includes("dm");
return entry.includes("dm");
});
if (!anyAllows) {
metrics.messagesDroppedByGrantTotal?.inc?.({ cap: "dm" });
const errAck: WSServerMessage = {
type: "ack",
id: msg.id ?? "",
messageId: "",
queued: false,
error: "blocked by recipient grants (sender lacks dm capability)",
};
conn.ws.send(JSON.stringify(errAck));
return;
}
} }
const messageId = await queueMessage({ const messageId = await queueMessage({
@@ -1921,11 +1865,16 @@ async function handleSend(
// Per-peer grant enforcement — load recipient grant maps once per send. // Per-peer grant enforcement — load recipient grant maps once per send.
// See .artifacts/specs/2026-04-15-per-peer-capabilities.md. // See .artifacts/specs/2026-04-15-per-peer-capabilities.md.
//
// We look up grants by BOTH the sender's stable member pubkey AND their
// ephemeral session pubkey, because CLI clients historically wrote grant
// entries keyed on session pubkey (from listPeers which preferred
// session key). Member key is preferred; session is a fall-through for
// compatibility with older clients until they migrate.
const DEFAULT_CAPS = ["read", "dm", "broadcast", "state-read"] as const; const DEFAULT_CAPS = ["read", "dm", "broadcast", "state-read"] as const;
const capNeeded: "dm" | "broadcast" = isMulticast ? "broadcast" : "dm"; const capNeeded: "dm" | "broadcast" = isMulticast ? "broadcast" : "dm";
const senderPubkey = conn.memberPubkey; // stable member key (survives session rotation) const senderMemberKey = conn.memberPubkey;
// Fetch grant maps for all connected peers in this mesh in one query. const senderSessionKey = conn.sessionPubkey ?? null;
// Small (bounded by concurrent connections per mesh); acceptable per send.
const grantRows = await db const grantRows = await db
.select({ id: meshMember.id, peerGrants: meshMember.peerGrants }) .select({ id: meshMember.id, peerGrants: meshMember.peerGrants })
.from(meshMember) .from(meshMember)
@@ -1935,10 +1884,14 @@ async function handleSend(
); );
function allowed(recipientMemberId: string): boolean { function allowed(recipientMemberId: string): boolean {
const grants = grantsByMemberId.get(recipientMemberId); const grants = grantsByMemberId.get(recipientMemberId);
if (!grants) return DEFAULT_CAPS.includes(capNeeded); if (!grants) return (DEFAULT_CAPS as readonly string[]).includes(capNeeded);
const entry = grants[senderPubkey]; const memberEntry = grants[senderMemberKey];
if (entry === undefined) return DEFAULT_CAPS.includes(capNeeded); if (memberEntry !== undefined) return memberEntry.includes(capNeeded);
return entry.includes(capNeeded); if (senderSessionKey) {
const sessionEntry = grants[senderSessionKey];
if (sessionEntry !== undefined) return sessionEntry.includes(capNeeded);
}
return (DEFAULT_CAPS as readonly string[]).includes(capNeeded);
} }
for (const [pid, peer] of connections) { for (const [pid, peer] of connections) {
@@ -2154,6 +2107,7 @@ function handleConnection(ws: WebSocket): void {
const pc = connByPubkey.get(p.pubkey); const pc = connByPubkey.get(p.pubkey);
return { return {
pubkey: p.pubkey, pubkey: p.pubkey,
memberPubkey: p.memberPubkey,
displayName: p.displayName, displayName: p.displayName,
status: p.status as "idle" | "working" | "dnd", status: p.status as "idle" | "working" | "dnd",
summary: p.summary, summary: p.summary,
@@ -3907,7 +3861,8 @@ function handleConnection(ws: WebSocket): void {
} }
throw dupErr; throw dupErr;
} }
const webhookUrl = `https://ic.claudemesh.com/hook/${conn.meshId}/${webhookSecret}`; const brokerPublicUrl = process.env.BROKER_PUBLIC_URL ?? "https://ic.claudemesh.com";
const webhookUrl = `${brokerPublicUrl.replace(/\/$/, "")}/hook/${conn.meshId}/${webhookSecret}`;
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "webhook_ack", type: "webhook_ack",
name: cw.name, name: cw.name,
@@ -3928,11 +3883,12 @@ function handleConnection(ws: WebSocket): void {
}) })
.from(meshWebhook) .from(meshWebhook)
.where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.active, true))); .where(and(eq(meshWebhook.meshId, conn.meshId), eq(meshWebhook.active, true)));
const brokerPublicUrlList = process.env.BROKER_PUBLIC_URL ?? "https://ic.claudemesh.com";
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "webhook_list", type: "webhook_list",
webhooks: whRows.map((r) => ({ webhooks: whRows.map((r) => ({
name: r.name, name: r.name,
url: `https://ic.claudemesh.com/hook/${conn.meshId}/${r.secret}`, url: `${brokerPublicUrlList.replace(/\/$/, "")}/hook/${conn.meshId}/${r.secret}`,
active: r.active, active: r.active,
createdAt: r.createdAt.toISOString(), createdAt: r.createdAt.toISOString(),
})), })),
@@ -4253,6 +4209,14 @@ function handleConnection(ws: WebSocket): void {
if (conn) { if (conn) {
void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {}); void audit(conn.meshId, "peer_left", conn.memberId, conn.displayName, {});
} }
// Clean up URL watches owned by this peer — the interval was
// happily fetching forever after the peer disconnected.
for (const [watchId, watch] of urlWatches) {
if (watch.presenceId === presenceId) {
clearInterval(watch.timer);
urlWatches.delete(watchId);
}
}
// Clean up stream subscriptions for this peer // Clean up stream subscriptions for this peer
for (const [key, subs] of streamSubscriptions) { for (const [key, subs] of streamSubscriptions) {
subs.delete(presenceId); subs.delete(presenceId);
@@ -4483,7 +4447,10 @@ async function main(): Promise<void> {
pingInterval.unref(); pingInterval.unref();
// GC rate-limit buckets periodically. // GC rate-limit buckets periodically.
const rlSweep = setInterval(() => hookRateLimit.sweep(), 5 * 60_000); const rlSweep = setInterval(() => {
hookRateLimit.sweep();
sendRateLimit.sweep();
}, 5 * 60_000);
rlSweep.unref(); rlSweep.unref();
// Queue depth gauge refresh (fires the metric; cheap COUNT query). // Queue depth gauge refresh (fires the metric; cheap COUNT query).
@@ -4624,7 +4591,7 @@ async function main(): Promise<void> {
.where(and(eq(telegramBridge.chatId, BigInt(chatId) as any), eq(telegramBridge.meshId, meshId))); .where(and(eq(telegramBridge.chatId, BigInt(chatId) as any), eq(telegramBridge.meshId, meshId)));
}, },
tgBotToken, tgBotToken,
"wss://ic.claudemesh.com/ws", process.env.BROKER_WS_URL ?? "wss://ic.claudemesh.com/ws",
// lookupMeshesByEmail: find user's meshes, create a bridge-specific member with fresh keypair // lookupMeshesByEmail: find user's meshes, create a bridge-specific member with fresh keypair
async (email) => { async (email) => {
const users = await db.select({ id: user.id, name: user.name }).from(user).where(eq(user.email, email)).limit(1); const users = await db.select({ id: user.id, name: user.name }).from(user).where(eq(user.email, email)).limit(1);
@@ -4786,6 +4753,52 @@ async function hashToken(token: string): Promise<string> {
return Array.from(new Uint8Array(hash)).map(b => b.toString(16).padStart(2, "0")).join(""); return Array.from(new Uint8Array(hash)).map(b => b.toString(16).padStart(2, "0")).join("");
} }
/**
* Verify the caller holds a valid CLI session token and return the
* authenticated user_id. Used by every authenticated /cli/... route
* to replace the former pattern of trusting body.user_id blindly.
*
* Returns null (and writes 401) on missing/invalid/revoked tokens.
* Callers must `return` immediately after a null response.
*/
async function requireCliAuth(
req: IncomingMessage,
res: ServerResponse,
): Promise<{ userId: string; sessionId: string } | null> {
const header = req.headers["authorization"];
if (!header || typeof header !== "string" || !header.startsWith("Bearer ")) {
writeJson(res, 401, { error: "missing_bearer_token" });
return null;
}
const token = header.slice("Bearer ".length).trim();
if (!token) {
writeJson(res, 401, { error: "empty_bearer_token" });
return null;
}
try {
const hash = await hashToken(token);
const [session] = await db
.select({ id: cliSessionTable.id, userId: cliSessionTable.userId, revokedAt: cliSessionTable.revokedAt })
.from(cliSessionTable)
.where(eq(cliSessionTable.tokenHash, hash))
.limit(1);
if (!session || session.revokedAt) {
writeJson(res, 401, { error: "invalid_or_revoked_token" });
return null;
}
// Touch last-seen so operators can see stale sessions.
db.update(cliSessionTable)
.set({ lastSeenAt: new Date() })
.where(eq(cliSessionTable.id, session.id))
.catch(() => { /* non-fatal */ });
return { userId: session.userId, sessionId: session.id };
} catch (e) {
log.error("auth", { err: e instanceof Error ? e.message : String(e) });
writeJson(res, 500, { error: "auth_check_failed" });
return null;
}
}
/** POST /cli/device-code — create a new device code. */ /** POST /cli/device-code — create a new device code. */
async function handleDeviceCodeNew(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> { async function handleDeviceCodeNew(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> {
let body: { hostname?: string; platform?: string; arch?: string } = {}; let body: { hostname?: string; platform?: string; arch?: string } = {};
@@ -4959,13 +4972,9 @@ async function handleDeviceCodeApprove(req: IncomingMessage, code: string, res:
/** GET /cli/sessions?user_id=... — list CLI sessions for a user. */ /** GET /cli/sessions?user_id=... — list CLI sessions for a user. */
/** GET /cli/meshes?user_id=... — list all meshes for a user with member counts. */ /** GET /cli/meshes?user_id=... — list all meshes for a user with member counts. */
async function handleCliMeshesList(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> { async function handleCliMeshesList(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> {
const url = new URL(req.url!, "http://localhost"); const auth = await requireCliAuth(req, res);
const userId = url.searchParams.get("user_id"); if (!auth) return;
const userId = auth.userId;
if (!userId) {
writeJson(res, 400, { error: "user_id required" });
return;
}
try { try {
// Find meshes via two paths: // Find meshes via two paths:
@@ -5168,7 +5177,10 @@ import { meshPermission } from "@turbostarter/db/schema/mesh";
* for a specific peer = blocked. Explicit null = reset to defaults. * for a specific peer = blocked. Explicit null = reset to defaults.
*/ */
async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> { async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
let body: { user_id: string; grants: Record<string, string[] | null> }; const auth = await requireCliAuth(req, res);
if (!auth) return;
let body: { grants: Record<string, string[] | null> };
try { try {
const chunks: Buffer[] = []; const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer); for await (const chunk of req) chunks.push(chunk as Buffer);
@@ -5177,8 +5189,8 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv
writeJson(res, 400, { error: "Invalid body" }); writeJson(res, 400, { error: "Invalid body" });
return; return;
} }
if (!body.user_id || !body.grants) { if (!body.grants) {
writeJson(res, 400, { error: "user_id and grants required" }); writeJson(res, 400, { error: "grants required" });
return; return;
} }
try { try {
@@ -5186,7 +5198,7 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv
if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; }
// Find the caller's member row. // Find the caller's member row.
const [member] = await db.select().from(meshMember) const [member] = await db.select().from(meshMember)
.where(and(eq(meshMember.meshId, m.id), eq(meshMember.userId, body.user_id), isNull(meshMember.revokedAt))) .where(and(eq(meshMember.meshId, m.id), eq(meshMember.userId, auth.userId), isNull(meshMember.revokedAt)))
.limit(1); .limit(1);
if (!member) { if (!member) {
writeJson(res, 403, { error: "Not a member of this mesh" }); writeJson(res, 403, { error: "Not a member of this mesh" });
@@ -5209,7 +5221,10 @@ async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: Serv
/** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */ /** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */
async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> { async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
let body: { user_id: string; email?: string; expires_in?: string; role?: string }; const auth = await requireCliAuth(req, res);
if (!auth) return;
let body: { email?: string; expires_in?: string; role?: string };
try { try {
const chunks: Buffer[] = []; const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer); for await (const chunk of req) chunks.push(chunk as Buffer);
@@ -5219,15 +5234,10 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv
return; return;
} }
if (!body.user_id) {
writeJson(res, 400, { error: "user_id required" });
return;
}
try { try {
const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1); const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1);
if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; }
if (m.ownerUserId !== body.user_id) { if (m.ownerUserId !== auth.userId) {
writeJson(res, 403, { error: "Only the owner can invite (for now)" }); writeJson(res, 403, { error: "Only the owner can invite (for now)" });
return; return;
} }
@@ -5280,7 +5290,7 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv
maxUses: 1, maxUses: 1,
role, role,
expiresAt, expiresAt,
createdBy: body.user_id, createdBy: auth.userId,
version: 2, version: 2,
}).returning({ id: inviteTable.id }); }).returning({ id: inviteTable.id });
inviteId = rows[0]!.id; inviteId = rows[0]!.id;
@@ -5349,7 +5359,10 @@ async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: Serv
} }
async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> { async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, started: number): Promise<void> {
let body: { user_id: string; name: string; pubkey?: string; slug?: string; template?: string; description?: string }; const auth = await requireCliAuth(req, res);
if (!auth) return;
let body: { name: string; pubkey?: string; slug?: string; template?: string; description?: string };
try { try {
const chunks: Buffer[] = []; const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer); for await (const chunk of req) chunks.push(chunk as Buffer);
@@ -5359,8 +5372,8 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
return; return;
} }
if (!body.user_id || !body.name) { if (!body.name) {
writeJson(res, 400, { error: "user_id and name required" }); writeJson(res, 400, { error: "name required" });
return; return;
} }
@@ -5395,7 +5408,7 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
// Create mesh — use raw SQL to avoid Drizzle default-column issues // Create mesh — use raw SQL to avoid Drizzle default-column issues
await db.execute(sql` await db.execute(sql`
INSERT INTO mesh.mesh (id, name, slug, owner_user_id, owner_pubkey, owner_secret_key, root_key) INSERT INTO mesh.mesh (id, name, slug, owner_user_id, owner_pubkey, owner_secret_key, root_key)
VALUES (${meshId}, ${body.name}, ${slug}, ${body.user_id}, ${ownerPubkey}, ${ownerSecretKey}, ${rootKey}) VALUES (${meshId}, ${body.name}, ${slug}, ${auth.userId}, ${ownerPubkey}, ${ownerSecretKey}, ${rootKey})
`); `);
// Create owner member. // Create owner member.
@@ -5410,11 +5423,11 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
const memberId = generateId(); const memberId = generateId();
await db.execute(sql` await db.execute(sql`
INSERT INTO mesh.member (id, mesh_id, user_id, peer_pubkey, display_name, role) INSERT INTO mesh.member (id, mesh_id, user_id, peer_pubkey, display_name, role)
VALUES (${memberId}, ${meshId}, ${body.user_id}, ${body.pubkey}, ${body.name + "-owner"}, ${"admin"}) VALUES (${memberId}, ${meshId}, ${auth.userId}, ${body.pubkey}, ${body.name + "-owner"}, ${"admin"})
`); `);
writeJson(res, 200, { id: meshId, slug, name: body.name, member_id: memberId }); writeJson(res, 200, { id: meshId, slug, name: body.name, member_id: memberId });
log.info("mesh-create", { route: "POST /cli/mesh/create", slug, user_id: body.user_id, latency_ms: Date.now() - started }); log.info("mesh-create", { route: "POST /cli/mesh/create", slug, user_id: auth.userId, latency_ms: Date.now() - started });
} catch (e) { } catch (e) {
log.error("mesh-create", { error: e instanceof Error ? e.message : String(e) }); log.error("mesh-create", { error: e instanceof Error ? e.message : String(e) });
writeJson(res, 500, { error: "Failed to create mesh" }); writeJson(res, 500, { error: "Failed to create mesh" });
@@ -5423,37 +5436,22 @@ async function handleCliMeshCreate(req: IncomingMessage, res: ServerResponse, st
/** DELETE /cli/mesh/:slug — delete a mesh (owner only). */ /** DELETE /cli/mesh/:slug — delete a mesh (owner only). */
async function handleMeshDelete(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> { async function handleMeshDelete(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
let body: { user_id: string }; const auth = await requireCliAuth(req, res);
try { if (!auth) return;
const chunks: Buffer[] = [];
for await (const chunk of req) chunks.push(chunk as Buffer);
body = JSON.parse(Buffer.concat(chunks).toString()) as typeof body;
} catch {
writeJson(res, 400, { error: "Invalid body" });
return;
}
if (!body.user_id) {
writeJson(res, 400, { error: "user_id required" });
return;
}
try { try {
// Find mesh by slug
const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1); const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1);
if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; } if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; }
// Only owner can delete if (m.ownerUserId !== auth.userId) {
if (m.ownerUserId !== body.user_id) {
writeJson(res, 403, { error: "Only the mesh owner can delete it" }); writeJson(res, 403, { error: "Only the mesh owner can delete it" });
return; return;
} }
// Soft delete (archive)
await db.update(mesh).set({ archivedAt: new Date() }).where(eq(mesh.id, m.id)); await db.update(mesh).set({ archivedAt: new Date() }).where(eq(mesh.id, m.id));
writeJson(res, 200, { ok: true, deleted: slug }); writeJson(res, 200, { ok: true, deleted: slug });
log.info("mesh-delete", { route: "DELETE /cli/mesh/:slug", slug, user_id: body.user_id, latency_ms: Date.now() - started }); log.info("mesh-delete", { route: "DELETE /cli/mesh/:slug", slug, user_id: auth.userId, latency_ms: Date.now() - started });
} catch (e) { } catch (e) {
log.error("mesh-delete", { error: e instanceof Error ? e.message : String(e) }); log.error("mesh-delete", { error: e instanceof Error ? e.message : String(e) });
writeJson(res, 500, { error: "Failed to delete mesh" }); writeJson(res, 500, { error: "Failed to delete mesh" });

View File

@@ -2,19 +2,29 @@
* Runtime migrations on broker startup. * Runtime migrations on broker startup.
* *
* Runs pending drizzle migrations against DATABASE_URL before the broker * Runs pending drizzle migrations against DATABASE_URL before the broker
* listens. Uses pg_advisory_lock so a multi-instance deploy doesn't race. * listens. Uses pg_try_advisory_lock with retry+timeout so a stuck old
* If migrations fail, the process exits non-zero so the orchestrator (Coolify * instance can't block new deploys indefinitely (the original
* healthcheck) sees the container as broken and doesn't route traffic. * pg_advisory_lock version matched the "stuck 12h" symptom perfectly —
* an old container held the lock and the new deploy waited forever).
*
* If migrations fail OR the lock can't be acquired within the timeout,
* the process exits non-zero so the orchestrator (Coolify healthcheck)
* sees the container as broken and doesn't route traffic to it.
*/ */
import { drizzle } from "drizzle-orm/postgres-js"; import { drizzle } from "drizzle-orm/postgres-js";
import { migrate } from "drizzle-orm/postgres-js/migrator"; import { migrate } from "drizzle-orm/postgres-js/migrator";
import postgres from "postgres"; import postgres from "postgres";
import { dirname, join } from "node:path"; import { join } from "node:path";
import { existsSync, readdirSync } from "node:fs"; import { existsSync, readdirSync } from "node:fs";
const LOCK_ID = 74737_73831; // "cmsh" ascii — stable magic constant const LOCK_ID = 74737_73831; // "cmsh" ascii — stable magic constant
/** Max total time to wait for the advisory lock before giving up. */
const LOCK_ACQUIRE_TIMEOUT_MS = 60_000;
/** Poll interval when lock is held by another instance. */
const LOCK_RETRY_INTERVAL_MS = 2_000;
export async function runMigrationsOnStartup(): Promise<void> { export async function runMigrationsOnStartup(): Promise<void> {
const url = process.env.DATABASE_URL; const url = process.env.DATABASE_URL;
if (!url) { if (!url) {
@@ -22,8 +32,6 @@ export async function runMigrationsOnStartup(): Promise<void> {
return; return;
} }
// Resolve the migrations folder — it's shipped inside @turbostarter/db's
// deploy subset in the runtime image. Dev path also works.
const candidates = [ const candidates = [
"/app/migrations", "/app/migrations",
"/app/node_modules/@turbostarter/db/migrations", "/app/node_modules/@turbostarter/db/migrations",
@@ -38,10 +46,38 @@ export async function runMigrationsOnStartup(): Promise<void> {
const count = readdirSync(migrationsFolder).filter((f) => f.endsWith(".sql")).length; const count = readdirSync(migrationsFolder).filter((f) => f.endsWith(".sql")).length;
console.log(`[migrate] ${count} migration files at ${migrationsFolder}`); console.log(`[migrate] ${count} migration files at ${migrationsFolder}`);
const sql = postgres(url, { max: 1, onnotice: () => { /* quiet */ } }); const sql = postgres(url, {
max: 1,
onnotice: () => { /* quiet */ },
// Statement-level safety net in case a long ALTER holds row locks.
// 5 min per statement is plenty for schema DDL.
statement_timeout: 5 * 60 * 1000,
});
try { try {
// Advisory lock so parallel instances serialise. // Set a lock_timeout for this session — PG will refuse to block more
await sql`SELECT pg_advisory_lock(${LOCK_ID})`; // than N ms on any lock acquisition (we only hold one at a time).
await sql`SET lock_timeout = ${LOCK_ACQUIRE_TIMEOUT_MS}`;
// Try to grab the advisory lock; poll if someone else holds it.
const deadline = Date.now() + LOCK_ACQUIRE_TIMEOUT_MS;
let locked = false;
while (Date.now() < deadline) {
const [row] = await sql<{ locked: boolean }[]>`
SELECT pg_try_advisory_lock(${LOCK_ID}) AS locked
`;
if (row?.locked) {
locked = true;
break;
}
console.log("[migrate] advisory lock held — retrying in 2s");
await new Promise((r) => setTimeout(r, LOCK_RETRY_INTERVAL_MS));
}
if (!locked) {
console.error(`[migrate] could not acquire advisory lock within ${LOCK_ACQUIRE_TIMEOUT_MS}ms — aborting`);
process.exit(1);
}
try { try {
const db = drizzle(sql); const db = drizzle(sql);
const start = Date.now(); const start = Date.now();

View File

@@ -46,9 +46,25 @@ export interface HookSetStatusResponse {
// --- WebSocket protocol envelopes --- // --- WebSocket protocol envelopes ---
/**
* Wire protocol version. Bump ONLY on breaking changes to the hello or
* push envelope shape. Clients send their highest supported version;
* broker picks the minimum of its own and the client's and echoes it
* on hello_ack. Backward-compat fields can be gated on this.
* 1 = initial release
*/
export const WS_PROTOCOL_VERSION = 1 as const;
/** Sent by client on connect to authenticate. */ /** Sent by client on connect to authenticate. */
export interface WSHelloMessage { export interface WSHelloMessage {
type: "hello"; type: "hello";
/** Highest WS protocol version the client understands. Optional —
* pre-alpha.36 clients omit it and the broker treats missing as 1. */
protocolVersion?: number;
/** Optional feature strings the client supports. Broker uses this to
* avoid emitting envelopes the client can't parse. Examples: "grants",
* "channels", "streams". Unknown capabilities ignored. */
capabilities?: string[];
meshId: string; meshId: string;
memberId: string; memberId: string;
pubkey: string; // must match mesh.member.peerPubkey pubkey: string; // must match mesh.member.peerPubkey

View File

@@ -35,18 +35,13 @@ const BROKER_HTTP = URLS.BROKER.replace("wss://", "https://").replace("ws://", "
async function syncToBroker(meshSlug: string, grants: Record<string, string[] | null>): Promise<void> { async function syncToBroker(meshSlug: string, grants: Record<string, string[] | null>): Promise<void> {
const auth = getStoredToken(); const auth = getStoredToken();
if (!auth) return; if (!auth) return;
let userId = "";
try {
const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string };
userId = payload.sub ?? "";
} catch { return; }
if (!userId) return;
try { try {
await request<{ ok: true }>({ await request<{ ok: true }>({
path: `/cli/mesh/${meshSlug}/grants`, path: `/cli/mesh/${meshSlug}/grants`,
method: "POST", method: "POST",
body: { user_id: userId, grants }, body: { grants },
baseUrl: BROKER_HTTP, baseUrl: BROKER_HTTP,
token: auth.session_token,
}); });
} catch (e) { } catch (e) {
render.warn(`broker grant sync failed — client filter still active: ${e instanceof Error ? e.message : e}`); render.warn(`broker grant sync failed — client filter still active: ${e instanceof Error ? e.message : e}`);
@@ -91,8 +86,20 @@ function resolveCaps(input: string[]): Capability[] {
async function resolvePeer(meshSlug: string, name: string): Promise<{ displayName: string; pubkey: string } | null> { async function resolvePeer(meshSlug: string, name: string): Promise<{ displayName: string; pubkey: string } | null> {
return await withMesh({ meshSlug }, async (client) => { return await withMesh({ meshSlug }, async (client) => {
const peers = await client.listPeers(); const peers = await client.listPeers();
const match = peers.find((p) => p.displayName === name || p.pubkey === name || p.pubkey.startsWith(name)); const match = peers.find(
return match ? { displayName: match.displayName, pubkey: match.pubkey } : null; (p) =>
p.displayName === name ||
p.pubkey === name ||
p.pubkey.startsWith(name) ||
p.memberPubkey === name ||
(p.memberPubkey && p.memberPubkey.startsWith(name)),
);
if (!match) return null;
// Prefer the stable member pubkey for grant keys — session pubkey
// rotates on every reconnect and would invalidate the grant entry.
// Broker falls back to session-key lookup for pre-alpha.36 clients.
const key = match.memberPubkey ?? match.pubkey;
return { displayName: match.displayName, pubkey: key };
}); });
} }

View File

@@ -25,23 +25,16 @@ export async function runList(): Promise<void> {
const config = readConfig(); const config = readConfig();
const auth = getStoredToken(); const auth = getStoredToken();
// Try to fetch from server // Try to fetch from server. Broker authenticates via Bearer token.
let serverMeshes: ServerMesh[] = []; let serverMeshes: ServerMesh[] = [];
if (auth) { if (auth) {
try { try {
let userId = ""; const res = await request<{ meshes: ServerMesh[] }>({
try { path: `/cli/meshes`,
const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string }; baseUrl: BROKER_HTTP,
userId = payload.sub ?? ""; token: auth.session_token,
} catch {} });
serverMeshes = res.meshes ?? [];
if (userId) {
const res = await request<{ meshes: ServerMesh[] }>({
path: `/cli/meshes?user_id=${userId}`,
baseUrl: BROKER_HTTP,
});
serverMeshes = res.meshes ?? [];
}
} catch {} } catch {}
} }

View File

@@ -64,7 +64,12 @@ export type Priority = "now" | "next" | "low";
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting"; export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting";
export interface PeerInfo { export interface PeerInfo {
/** Routing key — session pubkey if present, otherwise member pubkey. */
pubkey: string; pubkey: string;
/** Stable member pubkey (mesh.member.peer_pubkey). Preferred for grants,
* safety numbers, audit, and anything that needs identity stability
* across reconnects. */
memberPubkey?: string;
displayName: string; displayName: string;
status: string; status: string;
summary: string | null; summary: string | null;
@@ -138,6 +143,13 @@ export class BrokerClient {
private outbound: Array<() => void> = []; // closures that send once ws is open private outbound: Array<() => void> = []; // closures that send once ws is open
private pushHandlers = new Set<PushHandler>(); private pushHandlers = new Set<PushHandler>();
private pushBuffer: InboundPush[] = []; private pushBuffer: InboundPush[] = [];
/**
* Serialization chain for inbound push handling. Each incoming push
* appends to this promise so decrypt+enqueue happens in arrival order.
* Without it, a fast decrypt could land in pushBuffer before a slow
* earlier one — observable as reordered messages to consumers.
*/
private pushChain: Promise<void> = Promise.resolve();
private listPeersResolvers = new Map<string, { resolve: (peers: PeerInfo[]) => void; timer: NodeJS.Timeout }>(); private listPeersResolvers = new Map<string, { resolve: (peers: PeerInfo[]) => void; timer: NodeJS.Timeout }>();
private stateResolvers = new Map<string, { resolve: (result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void; timer: NodeJS.Timeout }>(); private stateResolvers = new Map<string, { resolve: (result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void; timer: NodeJS.Timeout }>();
private stateListResolvers = new Map<string, { resolve: (entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void; timer: NodeJS.Timeout }>(); private stateListResolvers = new Map<string, { resolve: (entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
@@ -1639,9 +1651,10 @@ export class BrokerClient {
const nonce = String(msg.nonce ?? ""); const nonce = String(msg.nonce ?? "");
const ciphertext = String(msg.ciphertext ?? ""); const ciphertext = String(msg.ciphertext ?? "");
const senderPubkey = String(msg.senderPubkey ?? ""); const senderPubkey = String(msg.senderPubkey ?? "");
// Decrypt asynchronously, then enqueue. Ordering within the // Serialize through pushChain so decrypt+enqueue preserves arrival
// buffer is preserved by awaiting before push. // order. Previously each inbound push ran in an independent async
void (async (): Promise<void> => { // task and fast decrypts could overtake slow ones.
this.pushChain = this.pushChain.then(async (): Promise<void> => {
// System messages (peer_joined, watch_triggered, mcp_deployed, etc.) // System messages (peer_joined, watch_triggered, mcp_deployed, etc.)
// have senderPubkey="system" with empty nonce/ciphertext — skip decryption. // have senderPubkey="system" with empty nonce/ciphertext — skip decryption.
const isSystem = msg.subtype === "system" || senderPubkey === "system"; const isSystem = msg.subtype === "system" || senderPubkey === "system";
@@ -1739,7 +1752,9 @@ export class BrokerClient {
/* handler errors are not the transport's problem */ /* handler errors are not the transport's problem */
} }
} }
})(); }).catch((e) => {
this.debug(`push handler chain error: ${e instanceof Error ? e.message : e}`);
});
return; return;
} }
if (msg.type === "state_result") { if (msg.type === "state_result") {
@@ -2194,14 +2209,25 @@ export class BrokerClient {
} }
private scheduleReconnect(): void { private scheduleReconnect(): void {
// Guard: if a reconnect is already scheduled don't pile on another
// timer — stacked close events used to schedule multiple concurrent
// reconnects which caused reconnect storms against the broker.
if (this.reconnectTimer) {
this.debug("reconnect already scheduled — skipping");
return;
}
this.setConnStatus("reconnecting"); this.setConnStatus("reconnecting");
const delay = const base =
BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!; BACKOFF_CAPS[Math.min(this.reconnectAttempt, BACKOFF_CAPS.length - 1)]!;
// Full jitter: uniform [0, base]. Prevents thundering herd when the
// broker restarts and every client reconnects on the same tick.
const delay = Math.floor(Math.random() * base);
this.reconnectAttempt += 1; this.reconnectAttempt += 1;
this.debug( this.debug(
`reconnect in ${delay}ms (attempt ${this.reconnectAttempt})`, `reconnect in ${delay}ms (attempt ${this.reconnectAttempt}, base ${base}ms)`,
); );
this.reconnectTimer = setTimeout(() => { this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
if (this.closed) return; if (this.closed) return;
this.connect().catch((e) => { this.connect().catch((e) => {
this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`); this.debug(`reconnect failed: ${e instanceof Error ? e.message : e}`);

View File

@@ -11,17 +11,11 @@ export async function generateInvite(
const auth = getStoredToken(); const auth = getStoredToken();
if (!auth) throw new Error("Not signed in"); if (!auth) throw new Error("Not signed in");
let userId = "";
try {
const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string };
userId = payload.sub ?? "";
} catch {}
if (!userId) throw new Error("Invalid token");
return request<{ url: string; code: string; expires_at: string; emailed?: boolean }>({ return request<{ url: string; code: string; expires_at: string; emailed?: boolean }>({
path: `/cli/mesh/${meshSlug}/invite`, path: `/cli/mesh/${meshSlug}/invite`,
method: "POST", method: "POST",
body: { user_id: userId, email: opts?.email, role: opts?.role }, body: { email: opts?.email, role: opts?.role },
baseUrl: BROKER_HTTP, baseUrl: BROKER_HTTP,
token: auth.session_token,
}); });
} }

View File

@@ -86,9 +86,23 @@ export async function claimInviteV2(opts: {
s.base64_variants.URLSAFE_NO_PADDING, s.base64_variants.URLSAFE_NO_PADDING,
); );
const base = opts.appBaseUrl.replace(/\/$/, ""); // Claim can be routed either through the web app's `/api/public/invites/:code/claim`
// (which proxies to the broker) or directly against the broker's
// `/invites/:code/claim`. Default to the broker direct path because it
// removes one hop and avoids depending on the web app being healthy.
// Override with CLAUDEMESH_CLAIM_URL for self-hosters / tests.
const code = encodeURIComponent(opts.code); const code = encodeURIComponent(opts.code);
const url = `${base}/api/public/invites/${code}/claim`; const override = process.env.CLAUDEMESH_CLAIM_URL;
let url: string;
if (override) {
url = override.replace(/\{code\}/g, code);
} else {
// Derive broker HTTP base from opts.appBaseUrl or a standard guess.
const brokerBase =
process.env.CLAUDEMESH_BROKER_HTTP ??
"https://ic.claudemesh.com";
url = `${brokerBase.replace(/\/$/, "")}/invites/${code}/claim`;
}
let res: Response; let res: Response;
try { try {

View File

@@ -11,21 +11,18 @@ export async function createMesh(name: string, opts?: { template?: string; descr
const auth = getStoredToken(); const auth = getStoredToken();
if (!auth) throw new Error("Not signed in"); if (!auth) throw new Error("Not signed in");
let userId = "";
try {
const payload = JSON.parse(Buffer.from(auth.session_token.split(".")[1]!, "base64url").toString()) as { sub?: string };
userId = payload.sub ?? "";
} catch {}
if (!userId) throw new Error("Invalid token — run `claudemesh login` again");
// Generate keypair first so we can send the pubkey to the broker // Generate keypair first so we can send the pubkey to the broker
const kp = await generateKeypair(); const kp = await generateKeypair();
// Broker authenticates via Authorization: Bearer <session_token>.
// user_id used to be in the body but is now derived from the verified
// session on the server. Older broker deploys still accept both.
const result = await request<{ id: string; slug: string; name: string; member_id: string }>({ const result = await request<{ id: string; slug: string; name: string; member_id: string }>({
path: "/cli/mesh/create", path: "/cli/mesh/create",
method: "POST", method: "POST",
body: { user_id: userId, name, pubkey: kp.publicKey, ...opts }, body: { name, pubkey: kp.publicKey, ...opts },
baseUrl: BROKER_HTTP, baseUrl: BROKER_HTTP,
token: auth.session_token,
}); });
const mesh: JoinedMesh = { const mesh: JoinedMesh = {