refactor: rename cli-v2 → cli, archive legacy cli, plus broker-side grants + auto-migrate
- apps/cli/ is now the canonical CLI (was apps/cli-v2/). - apps/cli/ legacy v0 archived as branch 'legacy-cli-archive' and tag 'cli-v0-legacy-final' before deletion; git history preserves it too. - .github/workflows/release-cli.yml paths updated. - pnpm-lock.yaml regenerated. Broker-side peer-grant enforcement (spec: 2026-04-15-per-peer-capabilities): - 0020_peer-grants.sql adds peer_grants jsonb + GIN index on mesh.member. - handleSend in broker fetches recipient grant maps once per send, drops messages silently when sender lacks the required capability. - POST /cli/mesh/:slug/grants to update from CLI; broker_messages_dropped_by_grant_total metric. - CLI grant/revoke/block now mirror to broker via syncToBroker. Auto-migrate on broker startup: - apps/broker/src/migrate.ts runs drizzle migrate with pg_advisory_lock before the HTTP server binds. Exits non-zero on failure so Coolify healthcheck fails closed. - Dockerfile copies packages/db/migrations into /app/migrations. - postgres 3.4.5 added as direct broker dep. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -696,6 +696,12 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "POST" && req.url?.startsWith("/cli/mesh/") && req.url?.endsWith("/grants")) {
|
||||
const slug = req.url.slice("/cli/mesh/".length).replace("/grants", "");
|
||||
handleCliMeshGrants(req, slug, res, started);
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.method === "DELETE" && req.url?.startsWith("/cli/mesh/")) {
|
||||
const slug = req.url.slice("/cli/mesh/".length);
|
||||
handleMeshDelete(req, slug, res, started);
|
||||
@@ -1836,6 +1842,28 @@ async function handleSend(
|
||||
...(subtype ? { subtype } : {}),
|
||||
};
|
||||
|
||||
// Per-peer grant enforcement — load recipient grant maps once per send.
|
||||
// See .artifacts/specs/2026-04-15-per-peer-capabilities.md.
|
||||
const DEFAULT_CAPS = ["read", "dm", "broadcast", "state-read"] as const;
|
||||
const capNeeded: "dm" | "broadcast" = isMulticast ? "broadcast" : "dm";
|
||||
const senderPubkey = conn.memberPubkey; // stable member key (survives session rotation)
|
||||
// Fetch grant maps for all connected peers in this mesh in one query.
|
||||
// Small (bounded by concurrent connections per mesh); acceptable per send.
|
||||
const grantRows = await db
|
||||
.select({ id: meshMember.id, peerGrants: meshMember.peerGrants })
|
||||
.from(meshMember)
|
||||
.where(eq(meshMember.meshId, conn.meshId));
|
||||
const grantsByMemberId = new Map<string, Record<string, string[]>>(
|
||||
grantRows.map((r) => [r.id, (r.peerGrants as Record<string, string[]>) ?? {}]),
|
||||
);
|
||||
function allowed(recipientMemberId: string): boolean {
|
||||
const grants = grantsByMemberId.get(recipientMemberId);
|
||||
if (!grants) return DEFAULT_CAPS.includes(capNeeded);
|
||||
const entry = grants[senderPubkey];
|
||||
if (entry === undefined) return DEFAULT_CAPS.includes(capNeeded);
|
||||
return entry.includes(capNeeded);
|
||||
}
|
||||
|
||||
for (const [pid, peer] of connections) {
|
||||
if (pid === senderPresenceId) continue;
|
||||
if (peer.meshId !== conn.meshId) continue;
|
||||
@@ -1854,6 +1882,14 @@ async function handleSend(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Per-peer capability check — silent drop if recipient hasn't granted
|
||||
// `capNeeded` to this sender (Signal block semantics: sender sees
|
||||
// delivered, recipient sees nothing).
|
||||
if (!allowed(peer.memberId)) {
|
||||
metrics.messagesDroppedByGrantTotal?.inc?.({ cap: capNeeded });
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isMulticast) {
|
||||
// Multicast: push directly to each connected peer. The queue
|
||||
// row has one delivered_at — can only be claimed once. Direct
|
||||
@@ -4319,7 +4355,12 @@ async function recoverScheduledMessages(): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
function main(): void {
|
||||
async function main(): Promise<void> {
|
||||
// Run pending migrations before the first connection is accepted.
|
||||
// Exits non-zero on failure so Coolify sees a broken container.
|
||||
const { runMigrationsOnStartup } = await import("./migrate");
|
||||
await runMigrationsOnStartup();
|
||||
|
||||
const wss = new WebSocketServer({
|
||||
noServer: true,
|
||||
maxPayload: env.MAX_MESSAGE_BYTES,
|
||||
@@ -5036,6 +5077,52 @@ import { checkPermission, getPermissions, setPermissions } from "./permissions";
|
||||
import { meshPermission } from "@turbostarter/db/schema/mesh";
|
||||
|
||||
/** POST /cli/mesh/create — create a new mesh via CLI. */
|
||||
/** POST /cli/mesh/:slug/grants — set per-peer grants for the caller's membership.
|
||||
*
|
||||
* Body: { user_id: string, grants: Record<peer_pubkey_hex, string[]> }
|
||||
* Merges the map into the caller's mesh_member.peer_grants. Empty array
|
||||
* for a specific peer = blocked. Explicit null = reset to defaults.
|
||||
*/
|
||||
async function handleCliMeshGrants(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
|
||||
let body: { user_id: string; grants: Record<string, string[] | null> };
|
||||
try {
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of req) chunks.push(chunk as Buffer);
|
||||
body = JSON.parse(Buffer.concat(chunks).toString()) as typeof body;
|
||||
} catch {
|
||||
writeJson(res, 400, { error: "Invalid body" });
|
||||
return;
|
||||
}
|
||||
if (!body.user_id || !body.grants) {
|
||||
writeJson(res, 400, { error: "user_id and grants required" });
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const [m] = await db.select().from(mesh).where(eq(mesh.slug, slug)).limit(1);
|
||||
if (!m) { writeJson(res, 404, { error: "Mesh not found" }); return; }
|
||||
// Find the caller's member row.
|
||||
const [member] = await db.select().from(meshMember)
|
||||
.where(and(eq(meshMember.meshId, m.id), eq(meshMember.userId, body.user_id), isNull(meshMember.revokedAt)))
|
||||
.limit(1);
|
||||
if (!member) {
|
||||
writeJson(res, 403, { error: "Not a member of this mesh" });
|
||||
return;
|
||||
}
|
||||
const current = (member.peerGrants as Record<string, string[]>) ?? {};
|
||||
const merged = { ...current };
|
||||
for (const [pk, caps] of Object.entries(body.grants)) {
|
||||
if (caps === null) delete merged[pk];
|
||||
else merged[pk] = caps;
|
||||
}
|
||||
await db.update(meshMember).set({ peerGrants: merged }).where(eq(meshMember.id, member.id));
|
||||
writeJson(res, 200, { ok: true, grants: merged });
|
||||
log.info("mesh-grants", { route: "POST /cli/mesh/:slug/grants", slug, member_id: member.id, latency_ms: Date.now() - started });
|
||||
} catch (e) {
|
||||
log.error("mesh-grants", { error: e instanceof Error ? e.message : String(e) });
|
||||
writeJson(res, 500, { error: "Failed to update grants" });
|
||||
}
|
||||
}
|
||||
|
||||
/** POST /cli/mesh/:slug/invite — generate an invite for a mesh. */
|
||||
async function handleCliMeshInvite(req: IncomingMessage, slug: string, res: ServerResponse, started: number): Promise<void> {
|
||||
let body: { user_id: string; email?: string; expires_in?: string; role?: string };
|
||||
@@ -5363,5 +5450,8 @@ async function handlePermissionsSet(req: IncomingMessage, slug: string, res: Ser
|
||||
// Skip starting the HTTP/WS server when running under vitest — tests import
|
||||
// claimInviteV2Core() directly and must not bind ports on module load.
|
||||
if (!process.env.VITEST) {
|
||||
main();
|
||||
main().catch((e) => {
|
||||
console.error("fatal:", e instanceof Error ? e.stack : e);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -90,6 +90,10 @@ export const metrics = {
|
||||
"broker_messages_rejected_total",
|
||||
"Messages rejected (size, auth, malformed)",
|
||||
),
|
||||
messagesDroppedByGrantTotal: new Counter(
|
||||
"broker_messages_dropped_by_grant_total",
|
||||
"Messages silently dropped because recipient didn't grant sender the required capability",
|
||||
),
|
||||
queueDepth: new Gauge(
|
||||
"broker_queue_depth",
|
||||
"Undelivered messages currently in the queue",
|
||||
|
||||
59
apps/broker/src/migrate.ts
Normal file
59
apps/broker/src/migrate.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
/**
|
||||
* Runtime migrations on broker startup.
|
||||
*
|
||||
* Runs pending drizzle migrations against DATABASE_URL before the broker
|
||||
* listens. Uses pg_advisory_lock so a multi-instance deploy doesn't race.
|
||||
* If migrations fail, the process exits non-zero so the orchestrator (Coolify
|
||||
* healthcheck) sees the container as broken and doesn't route traffic.
|
||||
*/
|
||||
|
||||
import { drizzle } from "drizzle-orm/postgres-js";
|
||||
import { migrate } from "drizzle-orm/postgres-js/migrator";
|
||||
import postgres from "postgres";
|
||||
import { dirname, join } from "node:path";
|
||||
import { existsSync, readdirSync } from "node:fs";
|
||||
|
||||
const LOCK_ID = 74737_73831; // "cmsh" ascii — stable magic constant
|
||||
|
||||
export async function runMigrationsOnStartup(): Promise<void> {
|
||||
const url = process.env.DATABASE_URL;
|
||||
if (!url) {
|
||||
console.error("[migrate] DATABASE_URL not set — skipping auto-migrate");
|
||||
return;
|
||||
}
|
||||
|
||||
// Resolve the migrations folder — it's shipped inside @turbostarter/db's
|
||||
// deploy subset in the runtime image. Dev path also works.
|
||||
const candidates = [
|
||||
"/app/migrations",
|
||||
"/app/node_modules/@turbostarter/db/migrations",
|
||||
join(process.cwd(), "..", "..", "packages", "db", "migrations"),
|
||||
join(process.cwd(), "packages", "db", "migrations"),
|
||||
];
|
||||
const migrationsFolder = candidates.find((p) => existsSync(p));
|
||||
if (!migrationsFolder) {
|
||||
console.error("[migrate] migrations folder not found — skipping. Searched:", candidates);
|
||||
return;
|
||||
}
|
||||
const count = readdirSync(migrationsFolder).filter((f) => f.endsWith(".sql")).length;
|
||||
console.log(`[migrate] ${count} migration files at ${migrationsFolder}`);
|
||||
|
||||
const sql = postgres(url, { max: 1, onnotice: () => { /* quiet */ } });
|
||||
try {
|
||||
// Advisory lock so parallel instances serialise.
|
||||
await sql`SELECT pg_advisory_lock(${LOCK_ID})`;
|
||||
try {
|
||||
const db = drizzle(sql);
|
||||
const start = Date.now();
|
||||
await migrate(db, { migrationsFolder });
|
||||
console.log(`[migrate] ok (${Date.now() - start}ms)`);
|
||||
} finally {
|
||||
await sql`SELECT pg_advisory_unlock(${LOCK_ID})`;
|
||||
}
|
||||
} catch (e) {
|
||||
console.error("[migrate] FAILED:", e instanceof Error ? e.message : e);
|
||||
process.exit(1);
|
||||
} finally {
|
||||
await sql.end({ timeout: 5 });
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user