feat(broker): add cli-sync, member-api, jwt modules + DB schema updates
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

New broker endpoints for CLI auth sync flow (POST /cli-sync),
member profile management, and mesh settings. Includes JWT
verification for dashboard-issued sync tokens. DB schema adds
member profile fields and mesh policy columns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-09 01:54:50 +01:00
parent d263fe0f26
commit a7d9ecab15
6 changed files with 605 additions and 1 deletions

133
apps/broker/src/cli-sync.ts Normal file
View File

@@ -0,0 +1,133 @@
/**
* POST /cli-sync handler.
*
* Accepts a sync JWT from the dashboard, creates or finds member rows
* for each mesh in the token, and returns mesh details + member IDs.
*/
import { and, eq, isNull } from "drizzle-orm";
import { db } from "./db";
import { verifySyncToken, type SyncTokenPayload } from "./jwt";
// Import schema tables
import {
mesh as meshTable,
meshMember as memberTable,
} from "@turbostarter/db/schema/mesh";
import { generateId } from "@turbostarter/shared/utils";
export interface CliSyncRequest {
sync_token: string;
peer_pubkey: string; // ed25519 hex (64 chars)
display_name: string;
}
export interface CliSyncResponse {
ok: true;
account_id: string;
meshes: Array<{
mesh_id: string;
slug: string;
broker_url: string;
member_id: string;
role: "admin" | "member";
}>;
}
export interface CliSyncError {
ok: false;
error: string;
}
export async function handleCliSync(
body: CliSyncRequest,
): Promise<CliSyncResponse | CliSyncError> {
// 1. Validate inputs
if (!body.sync_token || !body.peer_pubkey || !body.display_name) {
return { ok: false, error: "sync_token, peer_pubkey, display_name required" };
}
if (!/^[0-9a-f]{64}$/i.test(body.peer_pubkey)) {
return { ok: false, error: "peer_pubkey must be 64 hex chars (32 bytes)" };
}
// 2. Verify JWT
const tokenResult = await verifySyncToken(body.sync_token);
if (!tokenResult.ok) {
return { ok: false, error: `sync token invalid: ${tokenResult.error}` };
}
const payload = tokenResult.payload;
// 3. For each mesh in the token, create or find a member row
const resultMeshes: CliSyncResponse["meshes"] = [];
for (const tokenMesh of payload.meshes) {
// Verify mesh exists and is not archived
const [m] = await db
.select({ id: meshTable.id, slug: meshTable.slug })
.from(meshTable)
.where(and(eq(meshTable.id, tokenMesh.id), isNull(meshTable.archivedAt)));
if (!m) {
// Skip meshes that don't exist (could have been deleted)
continue;
}
// Check if this pubkey is already a member of this mesh
const [existing] = await db
.select({ id: memberTable.id, role: memberTable.role })
.from(memberTable)
.where(
and(
eq(memberTable.meshId, tokenMesh.id),
eq(memberTable.peerPubkey, body.peer_pubkey),
isNull(memberTable.revokedAt),
),
);
let memberId: string;
let role: "admin" | "member";
if (existing) {
// Already a member — update dashboard link + display name
memberId = existing.id;
role = existing.role;
await db
.update(memberTable)
.set({
dashboardUserId: payload.sub,
displayName: body.display_name,
})
.where(eq(memberTable.id, existing.id));
} else {
// Create new member row
memberId = generateId();
role = tokenMesh.role;
await db.insert(memberTable).values({
id: memberId,
meshId: tokenMesh.id,
peerPubkey: body.peer_pubkey,
displayName: body.display_name,
role: tokenMesh.role,
dashboardUserId: payload.sub,
});
}
resultMeshes.push({
mesh_id: tokenMesh.id,
slug: m.slug,
broker_url: process.env.BROKER_PUBLIC_URL ?? "wss://ic.claudemesh.com/ws",
member_id: memberId,
role,
});
}
if (resultMeshes.length === 0) {
return { ok: false, error: "no valid meshes found in sync token" };
}
return {
ok: true,
account_id: payload.sub,
meshes: resultMeshes,
};
}

View File

@@ -31,6 +31,7 @@ const envSchema = z.object({
RUNNER_URL: z.string().default("http://runner:7901"),
CLAUDEMESH_SERVICES_DIR: z.string().default("/var/claudemesh/services"),
BROKER_ENCRYPTION_KEY: z.string().default(""), // 64 hex chars (32 bytes). Auto-generated if empty.
CLI_SYNC_SECRET: z.string().default(""), // HS256 shared secret for dashboard→broker sync JWTs. Required for /cli-sync.
MAX_SERVICES_PER_MESH: z.coerce.number().int().positive().default(20),
MAX_SERVICE_ZIP_BYTES: z.coerce.number().int().positive().default(50 * 1024 * 1024),
NODE_ENV: z

146
apps/broker/src/jwt.ts Normal file
View File

@@ -0,0 +1,146 @@
/**
* JWT verification for CLI sync tokens.
*
* Sync tokens are HS256 JWTs issued by the dashboard after OAuth,
* shared secret between dashboard and broker via env var.
*
* JTI dedup: tracks used token IDs in a TTL-evicted Set to prevent replay.
*/
import { env } from "./env";
// --- Types ---
export interface SyncTokenPayload {
sub: string; // dashboard user ID
email: string;
meshes: Array<{
id: string;
slug: string;
role: "admin" | "member";
}>;
action: "sync" | "create";
newMesh?: {
name: string;
slug: string;
};
jti: string; // unique token ID for replay prevention
iat: number;
exp: number;
}
// --- JTI dedup ---
const usedJtis = new Map<string, number>(); // jti → expiry timestamp (ms)
// Sweep expired JTIs every 5 minutes
setInterval(() => {
const now = Date.now();
for (const [jti, exp] of usedJtis) {
if (exp < now) usedJtis.delete(jti);
}
}, 5 * 60_000);
// --- Verification ---
/**
* Verify and decode a sync token JWT.
* Returns the decoded payload on success, or an error string on failure.
*/
export async function verifySyncToken(
token: string,
): Promise<{ ok: true; payload: SyncTokenPayload } | { ok: false; error: string }> {
// Get shared secret from env
const secret = env.CLI_SYNC_SECRET;
if (!secret) {
return { ok: false, error: "CLI_SYNC_SECRET not configured on broker" };
}
try {
// Decode JWT manually (HS256)
const parts = token.split(".");
if (parts.length !== 3) {
return { ok: false, error: "malformed JWT" };
}
const headerB64 = parts[0]!;
const payloadB64 = parts[1]!;
const signatureB64 = parts[2]!;
// Verify signature (HS256)
const encoder = new TextEncoder();
const key = await crypto.subtle.importKey(
"raw",
encoder.encode(secret),
{ name: "HMAC", hash: "SHA-256" },
false,
["sign", "verify"],
);
const signatureInput = encoder.encode(`${headerB64}.${payloadB64}`);
const signature = base64UrlDecode(signatureB64);
const valid = await crypto.subtle.verify("HMAC", key, signature, signatureInput);
if (!valid) {
return { ok: false, error: "invalid signature" };
}
// Decode header — must be HS256
const header = JSON.parse(new TextDecoder().decode(base64UrlDecode(headerB64)));
if (header.alg !== "HS256") {
return { ok: false, error: `unsupported algorithm: ${header.alg}` };
}
// Decode payload
const payload = JSON.parse(
new TextDecoder().decode(base64UrlDecode(payloadB64)),
) as SyncTokenPayload;
// Check expiry
const now = Math.floor(Date.now() / 1000);
if (payload.exp && payload.exp < now) {
return { ok: false, error: "token expired" };
}
// Check iat not in the future (30s tolerance)
if (payload.iat && payload.iat > now + 30) {
return { ok: false, error: "token issued in the future" };
}
// JTI dedup
if (!payload.jti) {
return { ok: false, error: "missing jti" };
}
if (usedJtis.has(payload.jti)) {
return { ok: false, error: "token already used" };
}
// Mark as used with expiry time
usedJtis.set(payload.jti, (payload.exp ?? now + 900) * 1000);
// Basic validation
if (!payload.sub || !payload.email) {
return { ok: false, error: "missing sub or email" };
}
if (!Array.isArray(payload.meshes)) {
return { ok: false, error: "missing meshes array" };
}
return { ok: true, payload };
} catch (e) {
return { ok: false, error: e instanceof Error ? e.message : String(e) };
}
}
// --- Helpers ---
function base64UrlDecode(input: string): Uint8Array {
// Add padding
let base64 = input.replace(/-/g, "+").replace(/_/g, "/");
while (base64.length % 4) base64 += "=";
const binary = atob(base64);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < binary.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
return bytes;
}

View File

@@ -0,0 +1,284 @@
/**
* Member profile REST API handlers.
*
* PATCH /mesh/:meshId/member/:memberId — update member profile
* GET /mesh/:meshId/members — list all members with online status
* PATCH /mesh/:meshId/settings — update mesh settings (selfEditable)
*
* These are standalone handler functions. Route wiring happens in index.ts.
*/
import { and, eq, isNull, sql } from "drizzle-orm";
import { db } from "./db";
import {
mesh as meshTable,
meshMember as memberTable,
presence as presenceTable,
} from "@turbostarter/db/schema/mesh";
// --- Types ---
export interface MemberProfileUpdate {
displayName?: string;
roleTag?: string;
groups?: Array<{ name: string; role?: string }>;
messageMode?: "push" | "inbox" | "off";
}
export interface MemberPermissionUpdate {
permission?: "admin" | "member"; // only admins can change this
}
export type MemberUpdateRequest = MemberProfileUpdate & MemberPermissionUpdate;
interface SelfEditablePolicy {
displayName: boolean;
roleTag: boolean;
groups: boolean;
messageMode: boolean;
}
// --- Handlers ---
/**
* Update a member's profile fields.
*
* Authorization:
* - If caller is the target member: check mesh.selfEditable for each field
* - If caller is a mesh admin: allow all fields
* - permission field: admin-only always
*
* Returns: { ok: true, member: {...} } or { ok: false, error: string }
*/
export async function updateMemberProfile(
meshId: string,
memberId: string,
callerMemberId: string, // from auth header or WS connection
updates: MemberUpdateRequest,
): Promise<
| { ok: true; member: Record<string, unknown>; changes: MemberProfileUpdate }
| { ok: false; error: string }
> {
// 1. Load mesh for selfEditable policy
const [m] = await db
.select({ id: meshTable.id, selfEditable: meshTable.selfEditable })
.from(meshTable)
.where(and(eq(meshTable.id, meshId), isNull(meshTable.archivedAt)));
if (!m) return { ok: false, error: "mesh not found" };
// 2. Load caller's member row to check permission
const [caller] = await db
.select({ id: memberTable.id, role: memberTable.role })
.from(memberTable)
.where(
and(
eq(memberTable.id, callerMemberId),
eq(memberTable.meshId, meshId),
isNull(memberTable.revokedAt),
),
);
if (!caller) return { ok: false, error: "caller not a member of this mesh" };
const isAdmin = caller.role === "admin";
const isSelf = callerMemberId === memberId;
if (!isAdmin && !isSelf) {
return {
ok: false,
error: "not authorized — only admins or self can edit",
};
}
// 3. Check self-edit permissions for non-admin self-edits
const policy: SelfEditablePolicy =
(m.selfEditable as SelfEditablePolicy) ?? {
displayName: true,
roleTag: true,
groups: true,
messageMode: true,
};
const rejected: string[] = [];
if (!isAdmin && isSelf) {
if (updates.displayName !== undefined && !policy.displayName)
rejected.push("displayName");
if (updates.roleTag !== undefined && !policy.roleTag)
rejected.push("roleTag");
if (updates.groups !== undefined && !policy.groups)
rejected.push("groups");
if (updates.messageMode !== undefined && !policy.messageMode)
rejected.push("messageMode");
if (updates.permission !== undefined) rejected.push("permission");
}
if (rejected.length > 0) {
return {
ok: false,
error: `admin-managed fields: ${rejected.join(", ")}`,
};
}
// 4. Build update set
const set: Record<string, unknown> = {};
const changes: MemberProfileUpdate = {};
if (updates.displayName !== undefined) {
set.displayName = updates.displayName;
changes.displayName = updates.displayName;
}
if (updates.roleTag !== undefined) {
set.roleTag = updates.roleTag;
changes.roleTag = updates.roleTag;
}
if (updates.groups !== undefined) {
set.defaultGroups = updates.groups;
changes.groups = updates.groups;
}
if (updates.messageMode !== undefined) {
set.messageMode = updates.messageMode;
changes.messageMode = updates.messageMode;
}
if (updates.permission !== undefined && isAdmin) {
set.role = updates.permission;
}
if (Object.keys(set).length === 0) {
return { ok: false, error: "no fields to update" };
}
// 5. Update member row
await db.update(memberTable).set(set).where(eq(memberTable.id, memberId));
// 6. Read back the updated member
const [updated] = await db
.select()
.from(memberTable)
.where(eq(memberTable.id, memberId));
if (!updated) return { ok: false, error: "member not found after update" };
return {
ok: true,
member: {
id: updated.id,
displayName: updated.displayName,
roleTag: updated.roleTag,
groups: updated.defaultGroups,
messageMode: updated.messageMode,
permission: updated.role,
dashboardUserId: updated.dashboardUserId,
joinedAt: updated.joinedAt,
lastSeenAt: updated.lastSeenAt,
},
changes,
};
}
/**
* List all members of a mesh with online status.
*/
export async function listMeshMembers(
meshId: string,
): Promise<
| { ok: true; members: Array<Record<string, unknown>> }
| { ok: false; error: string }
> {
// Verify mesh exists
const [m] = await db
.select({ id: meshTable.id })
.from(meshTable)
.where(and(eq(meshTable.id, meshId), isNull(meshTable.archivedAt)));
if (!m) return { ok: false, error: "mesh not found" };
// Get all non-revoked members
const members = await db
.select()
.from(memberTable)
.where(
and(eq(memberTable.meshId, meshId), isNull(memberTable.revokedAt)),
);
// Early return for empty member list (avoids invalid SQL IN clause)
if (members.length === 0) {
return { ok: true, members: [] };
}
// Get active presences for online status
const activePresences = await db
.select({
memberId: presenceTable.memberId,
count: sql<number>`count(*)::int`,
})
.from(presenceTable)
.where(
and(
isNull(presenceTable.disconnectedAt),
sql`${presenceTable.memberId} IN (${sql.join(
members.map((m) => sql`${m.id}`),
sql`, `,
)})`,
),
)
.groupBy(presenceTable.memberId);
const onlineMap = new Map(
activePresences.map((p) => [p.memberId, p.count]),
);
return {
ok: true,
members: members.map((member) => ({
id: member.id,
displayName: member.displayName,
roleTag: member.roleTag,
groups: member.defaultGroups,
messageMode: member.messageMode,
permission: member.role,
dashboardUserId: member.dashboardUserId,
joinedAt: member.joinedAt?.toISOString(),
lastSeenAt: member.lastSeenAt?.toISOString(),
online: onlineMap.has(member.id),
sessionCount: onlineMap.get(member.id) ?? 0,
})),
};
}
/**
* Update mesh settings (currently: selfEditable policy).
* Admin-only.
*/
export async function updateMeshSettings(
meshId: string,
callerMemberId: string,
settings: { selfEditable?: SelfEditablePolicy },
): Promise<{ ok: true } | { ok: false; error: string }> {
// Check caller is admin
const [caller] = await db
.select({ role: memberTable.role })
.from(memberTable)
.where(
and(
eq(memberTable.id, callerMemberId),
eq(memberTable.meshId, meshId),
isNull(memberTable.revokedAt),
),
);
if (!caller || caller.role !== "admin") {
return { ok: false, error: "admin access required" };
}
const set: Record<string, unknown> = {};
if (settings.selfEditable) set.selfEditable = settings.selfEditable;
if (Object.keys(set).length === 0) {
return { ok: false, error: "no settings to update" };
}
await db.update(meshTable).set(set).where(eq(meshTable.id, meshId));
return { ok: true };
}

View File

@@ -0,0 +1,12 @@
-- Member profile columns: roleTag, defaultGroups, messageMode, dashboardUserId
ALTER TABLE "mesh"."member" ADD COLUMN "role_tag" text;--> statement-breakpoint
ALTER TABLE "mesh"."member" ADD COLUMN "default_groups" jsonb DEFAULT '[]'::jsonb;--> statement-breakpoint
ALTER TABLE "mesh"."member" ADD COLUMN "message_mode" text DEFAULT 'push';--> statement-breakpoint
ALTER TABLE "mesh"."member" ADD COLUMN "dashboard_user_id" text;--> statement-breakpoint
CREATE INDEX "member_dashboard_user_idx" ON "mesh"."member" ("dashboard_user_id");--> statement-breakpoint
-- Mesh policy: selfEditable (which profile fields members can self-edit)
ALTER TABLE "mesh"."mesh" ADD COLUMN "self_editable" jsonb DEFAULT '{"displayName":true,"roleTag":true,"groups":true,"messageMode":true}'::jsonb;--> statement-breakpoint
-- Invite preset: pre-configured profile values applied to new members on join
ALTER TABLE "mesh"."invite" ADD COLUMN "preset" jsonb DEFAULT '{}'::jsonb;

View File

@@ -1,6 +1,7 @@
import { relations } from "drizzle-orm";
import {
boolean,
index,
integer,
jsonb,
pgSchema,
@@ -108,6 +109,16 @@ export const mesh = meshSchema.table("mesh", {
* with recipient's ed25519 pubkey).
*/
rootKey: text(),
/**
* Per-mesh policy controlling which profile fields members can edit
* about themselves. Admins can always edit anyone's profile regardless.
*/
selfEditable: jsonb().$type<{
displayName: boolean;
roleTag: boolean;
groups: boolean;
messageMode: boolean;
}>().default({ displayName: true, roleTag: true, groups: true, messageMode: true }),
createdAt: timestamp().defaultNow().notNull(),
archivedAt: timestamp(),
});
@@ -135,10 +146,20 @@ export const meshMember = meshSchema.table("member", {
peerPubkey: text().notNull(),
displayName: text().notNull(),
role: meshRoleEnum().notNull().default("member"),
/** Free-text role label visible to peers (not to be confused with `role` which is the permission enum). */
roleTag: text(),
/** Persistent group memberships set via dashboard or CLI profile command. */
defaultGroups: jsonb().$type<Array<{ name: string; role?: string }>>().default([]),
/** Delivery preference: push (real-time), inbox (held), off (manual poll). */
messageMode: text().default("push"),
/** Links this mesh member to a dashboard OAuth user (Payload CMS user.id). */
dashboardUserId: text(),
joinedAt: timestamp().defaultNow().notNull(),
lastSeenAt: timestamp(),
revokedAt: timestamp(),
});
}, (table) => [
index("member_dashboard_user_idx").on(table.dashboardUserId),
]);
/**
* Invite tokens used to join a mesh via shareable URL.
@@ -157,6 +178,13 @@ export const invite = meshSchema.table("invite", {
maxUses: integer().notNull().default(1),
usedCount: integer().notNull().default(0),
role: meshRoleEnum().notNull().default("member"),
/** Pre-configured profile values applied to new members on join. */
preset: jsonb().$type<{
displayName?: string;
roleTag?: string;
groups?: Array<{ name: string; role?: string }>;
messageMode?: string;
}>().default({}),
expiresAt: timestamp().notNull(),
createdBy: text()
.references(() => user.id, { onDelete: "cascade", onUpdate: "cascade" })