diff --git a/apps/broker/src/cli-sync.ts b/apps/broker/src/cli-sync.ts new file mode 100644 index 0000000..498e552 --- /dev/null +++ b/apps/broker/src/cli-sync.ts @@ -0,0 +1,133 @@ +/** + * POST /cli-sync handler. + * + * Accepts a sync JWT from the dashboard, creates or finds member rows + * for each mesh in the token, and returns mesh details + member IDs. + */ + +import { and, eq, isNull } from "drizzle-orm"; +import { db } from "./db"; +import { verifySyncToken, type SyncTokenPayload } from "./jwt"; + +// Import schema tables +import { + mesh as meshTable, + meshMember as memberTable, +} from "@turbostarter/db/schema/mesh"; +import { generateId } from "@turbostarter/shared/utils"; + +export interface CliSyncRequest { + sync_token: string; + peer_pubkey: string; // ed25519 hex (64 chars) + display_name: string; +} + +export interface CliSyncResponse { + ok: true; + account_id: string; + meshes: Array<{ + mesh_id: string; + slug: string; + broker_url: string; + member_id: string; + role: "admin" | "member"; + }>; +} + +export interface CliSyncError { + ok: false; + error: string; +} + +export async function handleCliSync( + body: CliSyncRequest, +): Promise { + // 1. Validate inputs + if (!body.sync_token || !body.peer_pubkey || !body.display_name) { + return { ok: false, error: "sync_token, peer_pubkey, display_name required" }; + } + if (!/^[0-9a-f]{64}$/i.test(body.peer_pubkey)) { + return { ok: false, error: "peer_pubkey must be 64 hex chars (32 bytes)" }; + } + + // 2. Verify JWT + const tokenResult = await verifySyncToken(body.sync_token); + if (!tokenResult.ok) { + return { ok: false, error: `sync token invalid: ${tokenResult.error}` }; + } + const payload = tokenResult.payload; + + // 3. For each mesh in the token, create or find a member row + const resultMeshes: CliSyncResponse["meshes"] = []; + + for (const tokenMesh of payload.meshes) { + // Verify mesh exists and is not archived + const [m] = await db + .select({ id: meshTable.id, slug: meshTable.slug }) + .from(meshTable) + .where(and(eq(meshTable.id, tokenMesh.id), isNull(meshTable.archivedAt))); + + if (!m) { + // Skip meshes that don't exist (could have been deleted) + continue; + } + + // Check if this pubkey is already a member of this mesh + const [existing] = await db + .select({ id: memberTable.id, role: memberTable.role }) + .from(memberTable) + .where( + and( + eq(memberTable.meshId, tokenMesh.id), + eq(memberTable.peerPubkey, body.peer_pubkey), + isNull(memberTable.revokedAt), + ), + ); + + let memberId: string; + let role: "admin" | "member"; + + if (existing) { + // Already a member — update dashboard link + display name + memberId = existing.id; + role = existing.role; + await db + .update(memberTable) + .set({ + dashboardUserId: payload.sub, + displayName: body.display_name, + }) + .where(eq(memberTable.id, existing.id)); + } else { + // Create new member row + memberId = generateId(); + role = tokenMesh.role; + await db.insert(memberTable).values({ + id: memberId, + meshId: tokenMesh.id, + peerPubkey: body.peer_pubkey, + displayName: body.display_name, + role: tokenMesh.role, + dashboardUserId: payload.sub, + }); + } + + resultMeshes.push({ + mesh_id: tokenMesh.id, + slug: m.slug, + broker_url: process.env.BROKER_PUBLIC_URL ?? "wss://ic.claudemesh.com/ws", + member_id: memberId, + role, + }); + } + + if (resultMeshes.length === 0) { + return { ok: false, error: "no valid meshes found in sync token" }; + } + + return { + ok: true, + account_id: payload.sub, + meshes: resultMeshes, + }; +} diff --git a/apps/broker/src/env.ts b/apps/broker/src/env.ts index ed8666e..bbc6895 100644 --- a/apps/broker/src/env.ts +++ b/apps/broker/src/env.ts @@ -31,6 +31,7 @@ const envSchema = z.object({ RUNNER_URL: z.string().default("http://runner:7901"), CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"), BROKER_ENCRYPTION_KEY: z.string().default(""), // 64 hex chars (32 bytes). Auto-generated if empty. + CLI_SYNC_SECRET: z.string().default(""), // HS256 shared secret for dashboard→broker sync JWTs. Required for /cli-sync. MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20), MAX_SERVICE_ZIP_BYTES: z.coerce.number().int().positive().default(50 * 1024 * 1024), NODE_ENV: z diff --git a/apps/broker/src/jwt.ts b/apps/broker/src/jwt.ts new file mode 100644 index 0000000..07610f5 --- /dev/null +++ b/apps/broker/src/jwt.ts @@ -0,0 +1,146 @@ +/** + * JWT verification for CLI sync tokens. + * + * Sync tokens are HS256 JWTs issued by the dashboard after OAuth, + * shared secret between dashboard and broker via env var. + * + * JTI dedup: tracks used token IDs in a TTL-evicted Set to prevent replay. + */ + +import { env } from "./env"; + +// --- Types --- + +export interface SyncTokenPayload { + sub: string; // dashboard user ID + email: string; + meshes: Array<{ + id: string; + slug: string; + role: "admin" | "member"; + }>; + action: "sync" | "create"; + newMesh?: { + name: string; + slug: string; + }; + jti: string; // unique token ID for replay prevention + iat: number; + exp: number; +} + +// --- JTI dedup --- + +const usedJtis = new Map(); // jti → expiry timestamp (ms) + +// Sweep expired JTIs every 5 minutes +setInterval(() => { + const now = Date.now(); + for (const [jti, exp] of usedJtis) { + if (exp < now) usedJtis.delete(jti); + } +}, 5 * 60_000); + +// --- Verification --- + +/** + * Verify and decode a sync token JWT. + * Returns the decoded payload on success, or an error string on failure. + */ +export async function verifySyncToken( + token: string, +): Promise<{ ok: true; payload: SyncTokenPayload } | { ok: false; error: string }> { + // Get shared secret from env + const secret = env.CLI_SYNC_SECRET; + if (!secret) { + return { ok: false, error: "CLI_SYNC_SECRET not configured on broker" }; + } + + try { + // Decode JWT manually (HS256) + const parts = token.split("."); + if (parts.length !== 3) { + return { ok: false, error: "malformed JWT" }; + } + + const headerB64 = parts[0]!; + const payloadB64 = parts[1]!; + const signatureB64 = parts[2]!; + + // Verify signature (HS256) + const encoder = new TextEncoder(); + const key = await crypto.subtle.importKey( + "raw", + encoder.encode(secret), + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign", "verify"], + ); + + const signatureInput = encoder.encode(`${headerB64}.${payloadB64}`); + const signature = base64UrlDecode(signatureB64); + + const valid = await crypto.subtle.verify("HMAC", key, signature, signatureInput); + if (!valid) { + return { ok: false, error: "invalid signature" }; + } + + // Decode header — must be HS256 + const header = JSON.parse(new TextDecoder().decode(base64UrlDecode(headerB64))); + if (header.alg !== "HS256") { + return { ok: false, error: `unsupported algorithm: ${header.alg}` }; + } + + // Decode payload + const payload = JSON.parse( + new TextDecoder().decode(base64UrlDecode(payloadB64)), + ) as SyncTokenPayload; + + // Check expiry + const now = Math.floor(Date.now() / 1000); + if (payload.exp && payload.exp < now) { + return { ok: false, error: "token expired" }; + } + + // Check iat not in the future (30s tolerance) + if (payload.iat && payload.iat > now + 30) { + return { ok: false, error: "token issued in the future" }; + } + + // JTI dedup + if (!payload.jti) { + return { ok: false, error: "missing jti" }; + } + if (usedJtis.has(payload.jti)) { + return { ok: false, error: "token already used" }; + } + // Mark as used with expiry time + usedJtis.set(payload.jti, (payload.exp ?? now + 900) * 1000); + + // Basic validation + if (!payload.sub || !payload.email) { + return { ok: false, error: "missing sub or email" }; + } + if (!Array.isArray(payload.meshes)) { + return { ok: false, error: "missing meshes array" }; + } + + return { ok: true, payload }; + } catch (e) { + return { ok: false, error: e instanceof Error ? e.message : String(e) }; + } +} + +// --- Helpers --- + +function base64UrlDecode(input: string): Uint8Array { + // Add padding + let base64 = input.replace(/-/g, "+").replace(/_/g, "/"); + while (base64.length % 4) base64 += "="; + const binary = atob(base64); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < binary.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + return bytes; +} diff --git a/apps/broker/src/member-api.ts b/apps/broker/src/member-api.ts new file mode 100644 index 0000000..e6d5004 --- /dev/null +++ b/apps/broker/src/member-api.ts @@ -0,0 +1,284 @@ +/** + * Member profile REST API handlers. + * + * PATCH /mesh/:meshId/member/:memberId — update member profile + * GET /mesh/:meshId/members — list all members with online status + * PATCH /mesh/:meshId/settings — update mesh settings (selfEditable) + * + * These are standalone handler functions. Route wiring happens in index.ts. + */ + +import { and, eq, isNull, sql } from "drizzle-orm"; +import { db } from "./db"; +import { + mesh as meshTable, + meshMember as memberTable, + presence as presenceTable, +} from "@turbostarter/db/schema/mesh"; + +// --- Types --- + +export interface MemberProfileUpdate { + displayName?: string; + roleTag?: string; + groups?: Array<{ name: string; role?: string }>; + messageMode?: "push" | "inbox" | "off"; +} + +export interface MemberPermissionUpdate { + permission?: "admin" | "member"; // only admins can change this +} + +export type MemberUpdateRequest = MemberProfileUpdate & MemberPermissionUpdate; + +interface SelfEditablePolicy { + displayName: boolean; + roleTag: boolean; + groups: boolean; + messageMode: boolean; +} + +// --- Handlers --- + +/** + * Update a member's profile fields. + * + * Authorization: + * - If caller is the target member: check mesh.selfEditable for each field + * - If caller is a mesh admin: allow all fields + * - permission field: admin-only always + * + * Returns: { ok: true, member: {...} } or { ok: false, error: string } + */ +export async function updateMemberProfile( + meshId: string, + memberId: string, + callerMemberId: string, // from auth header or WS connection + updates: MemberUpdateRequest, +): Promise< + | { ok: true; member: Record; changes: MemberProfileUpdate } + | { ok: false; error: string } +> { + // 1. Load mesh for selfEditable policy + const [m] = await db + .select({ id: meshTable.id, selfEditable: meshTable.selfEditable }) + .from(meshTable) + .where(and(eq(meshTable.id, meshId), isNull(meshTable.archivedAt))); + + if (!m) return { ok: false, error: "mesh not found" }; + + // 2. Load caller's member row to check permission + const [caller] = await db + .select({ id: memberTable.id, role: memberTable.role }) + .from(memberTable) + .where( + and( + eq(memberTable.id, callerMemberId), + eq(memberTable.meshId, meshId), + isNull(memberTable.revokedAt), + ), + ); + + if (!caller) return { ok: false, error: "caller not a member of this mesh" }; + + const isAdmin = caller.role === "admin"; + const isSelf = callerMemberId === memberId; + + if (!isAdmin && !isSelf) { + return { + ok: false, + error: "not authorized — only admins or self can edit", + }; + } + + // 3. Check self-edit permissions for non-admin self-edits + const policy: SelfEditablePolicy = + (m.selfEditable as SelfEditablePolicy) ?? { + displayName: true, + roleTag: true, + groups: true, + messageMode: true, + }; + + const rejected: string[] = []; + if (!isAdmin && isSelf) { + if (updates.displayName !== undefined && !policy.displayName) + rejected.push("displayName"); + if (updates.roleTag !== undefined && !policy.roleTag) + rejected.push("roleTag"); + if (updates.groups !== undefined && !policy.groups) + rejected.push("groups"); + if (updates.messageMode !== undefined && !policy.messageMode) + rejected.push("messageMode"); + if (updates.permission !== undefined) rejected.push("permission"); + } + + if (rejected.length > 0) { + return { + ok: false, + error: `admin-managed fields: ${rejected.join(", ")}`, + }; + } + + // 4. Build update set + const set: Record = {}; + const changes: MemberProfileUpdate = {}; + + if (updates.displayName !== undefined) { + set.displayName = updates.displayName; + changes.displayName = updates.displayName; + } + if (updates.roleTag !== undefined) { + set.roleTag = updates.roleTag; + changes.roleTag = updates.roleTag; + } + if (updates.groups !== undefined) { + set.defaultGroups = updates.groups; + changes.groups = updates.groups; + } + if (updates.messageMode !== undefined) { + set.messageMode = updates.messageMode; + changes.messageMode = updates.messageMode; + } + if (updates.permission !== undefined && isAdmin) { + set.role = updates.permission; + } + + if (Object.keys(set).length === 0) { + return { ok: false, error: "no fields to update" }; + } + + // 5. Update member row + await db.update(memberTable).set(set).where(eq(memberTable.id, memberId)); + + // 6. Read back the updated member + const [updated] = await db + .select() + .from(memberTable) + .where(eq(memberTable.id, memberId)); + + if (!updated) return { ok: false, error: "member not found after update" }; + + return { + ok: true, + member: { + id: updated.id, + displayName: updated.displayName, + roleTag: updated.roleTag, + groups: updated.defaultGroups, + messageMode: updated.messageMode, + permission: updated.role, + dashboardUserId: updated.dashboardUserId, + joinedAt: updated.joinedAt, + lastSeenAt: updated.lastSeenAt, + }, + changes, + }; +} + +/** + * List all members of a mesh with online status. + */ +export async function listMeshMembers( + meshId: string, +): Promise< + | { ok: true; members: Array> } + | { ok: false; error: string } +> { + // Verify mesh exists + const [m] = await db + .select({ id: meshTable.id }) + .from(meshTable) + .where(and(eq(meshTable.id, meshId), isNull(meshTable.archivedAt))); + + if (!m) return { ok: false, error: "mesh not found" }; + + // Get all non-revoked members + const members = await db + .select() + .from(memberTable) + .where( + and(eq(memberTable.meshId, meshId), isNull(memberTable.revokedAt)), + ); + + // Early return for empty member list (avoids invalid SQL IN clause) + if (members.length === 0) { + return { ok: true, members: [] }; + } + + // Get active presences for online status + const activePresences = await db + .select({ + memberId: presenceTable.memberId, + count: sql`count(*)::int`, + }) + .from(presenceTable) + .where( + and( + isNull(presenceTable.disconnectedAt), + sql`${presenceTable.memberId} IN (${sql.join( + members.map((m) => sql`${m.id}`), + sql`, `, + )})`, + ), + ) + .groupBy(presenceTable.memberId); + + const onlineMap = new Map( + activePresences.map((p) => [p.memberId, p.count]), + ); + + return { + ok: true, + members: members.map((member) => ({ + id: member.id, + displayName: member.displayName, + roleTag: member.roleTag, + groups: member.defaultGroups, + messageMode: member.messageMode, + permission: member.role, + dashboardUserId: member.dashboardUserId, + joinedAt: member.joinedAt?.toISOString(), + lastSeenAt: member.lastSeenAt?.toISOString(), + online: onlineMap.has(member.id), + sessionCount: onlineMap.get(member.id) ?? 0, + })), + }; +} + +/** + * Update mesh settings (currently: selfEditable policy). + * Admin-only. + */ +export async function updateMeshSettings( + meshId: string, + callerMemberId: string, + settings: { selfEditable?: SelfEditablePolicy }, +): Promise<{ ok: true } | { ok: false; error: string }> { + // Check caller is admin + const [caller] = await db + .select({ role: memberTable.role }) + .from(memberTable) + .where( + and( + eq(memberTable.id, callerMemberId), + eq(memberTable.meshId, meshId), + isNull(memberTable.revokedAt), + ), + ); + + if (!caller || caller.role !== "admin") { + return { ok: false, error: "admin access required" }; + } + + const set: Record = {}; + if (settings.selfEditable) set.selfEditable = settings.selfEditable; + + if (Object.keys(set).length === 0) { + return { ok: false, error: "no settings to update" }; + } + + await db.update(meshTable).set(set).where(eq(meshTable.id, meshId)); + + return { ok: true }; +} diff --git a/packages/db/migrations/0015_member-profile-and-mesh-policy.sql b/packages/db/migrations/0015_member-profile-and-mesh-policy.sql new file mode 100644 index 0000000..481afcc --- /dev/null +++ b/packages/db/migrations/0015_member-profile-and-mesh-policy.sql @@ -0,0 +1,12 @@ +-- Member profile columns: roleTag, defaultGroups, messageMode, dashboardUserId +ALTER TABLE "mesh"."member" ADD COLUMN "role_tag" text;--> statement-breakpoint +ALTER TABLE "mesh"."member" ADD COLUMN "default_groups" jsonb DEFAULT '[]'::jsonb;--> statement-breakpoint +ALTER TABLE "mesh"."member" ADD COLUMN "message_mode" text DEFAULT 'push';--> statement-breakpoint +ALTER TABLE "mesh"."member" ADD COLUMN "dashboard_user_id" text;--> statement-breakpoint +CREATE INDEX "member_dashboard_user_idx" ON "mesh"."member" ("dashboard_user_id");--> statement-breakpoint + +-- Mesh policy: selfEditable (which profile fields members can self-edit) +ALTER TABLE "mesh"."mesh" ADD COLUMN "self_editable" jsonb DEFAULT '{"displayName":true,"roleTag":true,"groups":true,"messageMode":true}'::jsonb;--> statement-breakpoint + +-- Invite preset: pre-configured profile values applied to new members on join +ALTER TABLE "mesh"."invite" ADD COLUMN "preset" jsonb DEFAULT '{}'::jsonb; diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 40e5b0f..8cdb9f3 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -1,6 +1,7 @@ import { relations } from "drizzle-orm"; import { boolean, + index, integer, jsonb, pgSchema, @@ -108,6 +109,16 @@ export const mesh = meshSchema.table("mesh", { * with recipient's ed25519 pubkey). */ rootKey: text(), + /** + * Per-mesh policy controlling which profile fields members can edit + * about themselves. Admins can always edit anyone's profile regardless. + */ + selfEditable: jsonb().$type<{ + displayName: boolean; + roleTag: boolean; + groups: boolean; + messageMode: boolean; + }>().default({ displayName: true, roleTag: true, groups: true, messageMode: true }), createdAt: timestamp().defaultNow().notNull(), archivedAt: timestamp(), }); @@ -135,10 +146,20 @@ export const meshMember = meshSchema.table("member", { peerPubkey: text().notNull(), displayName: text().notNull(), role: meshRoleEnum().notNull().default("member"), + /** Free-text role label visible to peers (not to be confused with `role` which is the permission enum). */ + roleTag: text(), + /** Persistent group memberships set via dashboard or CLI profile command. */ + defaultGroups: jsonb().$type>().default([]), + /** Delivery preference: push (real-time), inbox (held), off (manual poll). */ + messageMode: text().default("push"), + /** Links this mesh member to a dashboard OAuth user (Payload CMS user.id). */ + dashboardUserId: text(), joinedAt: timestamp().defaultNow().notNull(), lastSeenAt: timestamp(), revokedAt: timestamp(), -}); +}, (table) => [ + index("member_dashboard_user_idx").on(table.dashboardUserId), +]); /** * Invite tokens used to join a mesh via shareable URL. @@ -157,6 +178,13 @@ export const invite = meshSchema.table("invite", { maxUses: integer().notNull().default(1), usedCount: integer().notNull().default(0), role: meshRoleEnum().notNull().default("member"), + /** Pre-configured profile values applied to new members on join. */ + preset: jsonb().$type<{ + displayName?: string; + roleTag?: string; + groups?: Array<{ name: string; role?: string }>; + messageMode?: string; + }>().default({}), expiresAt: timestamp().notNull(), createdBy: text() .references(() => user.id, { onDelete: "cascade", onUpdate: "cascade" })