diff --git a/.artifacts/specs/2026-05-02-v0.2.0-scope.md b/.artifacts/specs/2026-05-02-v0.2.0-scope.md new file mode 100644 index 0000000..9cfa9e1 --- /dev/null +++ b/.artifacts/specs/2026-05-02-v0.2.0-scope.md @@ -0,0 +1,271 @@ +# claudemesh v0.2.0 — scope + +**Date:** 2026-05-02 +**Status:** draft +**Predecessor:** [`2026-05-02-architecture-north-star.md`](./2026-05-02-architecture-north-star.md) (1.5.0 architecture lock) + +--- + +## Cut + +**Theme: from agent-only mesh to mesh of agents, humans, and external systems — with conversation context.** + +| # | Feature | Effort | Spine | +|---|---------|--------|-------| +| 1 | **Topics** (channels/rooms within a mesh) | 2-3 d | yes | +| 2 | **Humans in the mesh** (web chat panel) | 2-3 d | depends on #1 | +| 3 | **REST API + external WS** (API keys per mesh) | 2-3 d | depends on #1 | +| 4 | **Bridge peer** (forwards one topic between meshes) | 1 d | depends on #1 | + +Optional pickup if all four ship early: +- **Local peer aliases** (~0.5 d) — IRC-style local labels for hard-to-remember displayNames. +- **Semantic peer search** (~0.5 d) — already in vision doc; useful once topics exist. + +Total: 7-9 days plus 1-2 days slack. Targeting **release window: 2026-05-12 to 2026-05-16**. + +--- + +## Why this cut + +The 1.5.0 architecture (CLI-first, tool-less MCP, policy engine) is finished. The next bottleneck is **product surface**, not engineering. + +Current taxonomy `mesh + group + role` is the right *organizational* structure but missing a *conversational* primitive. Every message is DM or `@group` broadcast — there's no continuity for "the deploys conversation," no scoped state/memory/files, no way for a human to join a topic without joining the whole mesh, no way for a bridge to forward a single thread of work. + +**Topics fix this.** They are the spine of v0.2.0: +- Without topics, "humans in mesh" floods every human with every peer's chatter. +- Without topics, "bridge" forwards everything (loop risk, signal-to-noise problem). +- Without topics, REST API endpoints have no natural sub-mesh scope. + +Once topics exist, humans + REST + bridge each become 50% smaller because they slot into a clean primitive instead of inventing one. + +--- + +## Deferred + +| Item | Why later | +|---|---| +| **Federation** (broker-to-broker) | Bridges prototype it. Learn from real use first. | +| **Sandboxes** (E2B / Modal) | Orthogonal capability. Separate release. | +| **Sim SDK** (`@claudemesh/sim`) | Niche audience; long-tail. v0.3.0+. | +| **Welcome back / persistent MCP** | Already in progress as 1.6.0 patch. | +| **Mesh telemetry** | Pre-PMF telemetry is busywork; users first. | + +--- + +## Design sketches + +### 1. Topics + +**Mental model:** mesh is *who you trust*; group is *who you are*; topic is *what you're talking about*. Three orthogonal axes. + +**Wire shape:** + +```yaml +topic: + id: + mesh_slug: openclaw + name: deploys # unique within mesh + description: "deploy + on-call" + visibility: public # public | private (invite-only) | dm (1:1, autocreated) + created_by: + created_at: +``` + +**Membership:** + +```yaml +topic_member: + topic_id: + pubkey: # session pubkey OR member_pubkey for durable identity + role: lead | member | observer + joined_at: + last_read_at: # for unread counts +``` + +**Messages reference a topic, not just a target:** + +```jsonc +// existing send_message envelope gains a `topic` field +{ + "to": "@deploys", // or topic id, or peer name (DM) + "topic": "deploys", // optional explicit, inferred from `to: @` + "message": "...", + "priority": "next" +} +``` + +**Resolution rules:** +- `to: "alice"` → DM to peer alice (no topic). +- `to: "@frontend"` → group broadcast (no topic — backwards compatible with 1.5.0). +- `to: "#deploys"` → topic message; delivered only to topic subscribers. +- `to: "*"` → mesh-wide broadcast (kept; lower-priority than topic for new comms). + +**State/memory/files scoping:** +- `claudemesh state set --topic deploys` — namespace under topic. +- `claudemesh remember "..." --topic deploys` — topic-scoped memory. +- `claudemesh file list --topic deploys` — files visible only to topic members. + +**CLI:** + +```bash +claudemesh topic create deploys --description "deploy + on-call" +claudemesh topic list # all topics in mesh +claudemesh topic join deploys +claudemesh topic leave deploys +claudemesh topic invite deploys # private topics +claudemesh topic members deploys +claudemesh topic delete deploys # creator/admin only +claudemesh send "#deploys" "rolling out 1.5.1" +``` + +**MCP `claude/channel` notification gains `topic`** as an attribute so peers know which conversation an inbound message belongs to. + +**Effort breakdown:** schema + drizzle migration + CLI verbs + broker routing changes (filter by topic membership) + skill update. ~250 LoC across CLI + ~200 LoC broker. + +--- + +### 2. Humans in the mesh + +**Mental model:** a human is a peer with `peer_type: "human"` whose presence is durable (no session pubkey rotation; identity tied to an account). They join *topics*, not the whole mesh — so they only see relevant traffic. + +**Wire:** + +```jsonc +// hello envelope gains: +{ + "peer_type": "human", + "session_pubkey": , + "member_pubkey": , + "display_name": "Alejandro" +} +``` + +**Web panel (`apps/web`):** + +``` +/dashboard/mesh//topic/ + ├── topic header (members, settings) + ├── message stream (WS-driven, infinite scroll on history) + ├── compose box (typing indicator broadcast on focus) + └── members sidebar (presence, profile, last_read_at) +``` + +**Backend changes:** +- Persistent message history per topic (drizzle table `topic_messages`; existing direct messages stay ephemeral by design). +- Topic-scoped read receipts (`topic_member.last_read_at`). +- Typing indicator: short-lived broadcast on the topic channel (`{type: "typing", peer: "..."}`). + +**Privacy invariant:** a human in `#deploys` sees only `#deploys` traffic + DMs sent to them. Never the whole mesh. This is the *whole reason* topics come first. + +**Effort:** WS endpoint already exists (broker side). Add: topic_messages table, history endpoint, web UI components (compose, stream, members). ~3 days. + +--- + +### 3. REST API + external WS + +**Auth:** API keys per mesh, scoped by capability + topic. + +```yaml +api_key: + id: + mesh_slug: openclaw + label: "ci-bot" + hash: + capabilities: ["send", "read"] + topic_scopes: ["#deploys"] # null = all topics; explicit = whitelist + created_at: + last_used_at: + revoked_at: +``` + +**CLI for issuance (admin only):** + +```bash +claudemesh apikey create --label "ci-bot" --topic deploys --cap send,read +claudemesh apikey list +claudemesh apikey revoke +``` + +**REST endpoints (claudemesh.com/api/v1):** + +``` +POST /v1/messages Send a message (auth: api key). +GET /v1/topics/:name/messages History (with pagination cursor). +GET /v1/peers List online peers (filtered by key scope). +GET /v1/state Read mesh state. +POST /v1/state Write mesh state. +``` + +**External WS:** `wss://ic.claudemesh.com/ws?api_key=...&topic=deploys` — connects with `peer_type: "external"`. Push-pipe parity with internal sessions; can subscribe to topic streams. + +**Why REST keys not session keypairs:** external clients (Zapier, GitHub Actions, mobile apps, Slack workspace bots) need long-lived bearer-like creds, not ephemeral keypairs. Different threat model — scope tightly via topic + capability. + +**Effort:** ~3 days. Mostly broker work; CLI gets the issuance verbs. + +--- + +### 4. Bridge peer + +**Mental model:** a bridge is a peer that holds memberships in two meshes and forwards traffic on a single topic between them. SDK-only (no broker changes). + +**Implementation (uses existing `@claudemesh/sdk`):** + +```typescript +import { Bridge } from "@claudemesh/sdk"; + +const bridge = new Bridge({ + meshes: ["work", "external"], + topic: "incidents", + filter: (msg) => !msg.tags.includes("internal-only"), + loop_prevention: { tag: "via-bridge", max_hops: 2 }, +}); +await bridge.start(); +``` + +**Loop prevention:** every forwarded message gets a `bridge_hop_` tag; bridges drop messages that already carry their own tag (prevents echo) and any message with `max_hops` exceeded. + +**CLI:** `claudemesh bridge run ` — runs an SDK bridge as a long-lived process. Useful for "run a bridge inside a docker container or systemd unit." + +**What it deliberately doesn't do:** +- Cross-broker federation (that's a separate broker-to-broker protocol). +- Bidirectional state/memory sync (only messages on a single topic). +- Identity unification (a peer in mesh A is *not* the same peer in mesh B; the bridge appears as the messenger). + +**Effort:** ~1 day on top of the existing SDK. + +--- + +## Acceptance signals + +v0.2.0 ships when all four are demonstrable end-to-end: + +1. A peer creates `#deploys`, two other peers join it, traffic is topic-scoped, mesh-wide chat doesn't see it. +2. A human signs in at `claudemesh.com`, joins `#deploys`, sends a message, a Claude session in the mesh receives it as a `` interrupt with `topic="deploys"`. +3. A `curl` POST against `/v1/messages` with an API key delivers a message into `#deploys`; the same API key is rejected on `#secrets`. +4. A bridge peer running locally forwards `#incidents` between two test meshes; loop is prevented; one-shot demo recorded. + +--- + +## Out of scope (explicitly) + +- Topic hierarchy / nesting (flat namespace per mesh; revisit at scale). +- Topic-scoped capability grants (`grant read:#topic`) — solvable later via capability extension. +- Threads-within-topics (Slack-style). Defer. +- Voice / video / file-upload UX for humans — text only in v0.2.0. +- Federation, sandboxes, sim-sdk — explicitly deferred above. + +--- + +## Risks + +- **Topics retrofit risk** — existing 1.5.0 message envelope assumes "to" is peer/group/star. Adding `topic` is additive on the wire but changes routing logic. Test path: backfill existing meshes with a default `#general` topic; opt-in to topic-only routing. +- **Web chat session lifecycle** — humans expect "I closed the tab and came back, my place is preserved." Ephemeral session pubkeys break that. Workaround: tie human peer identity to `member_pubkey` + last_read_at on the topic; session pubkey rotates per tab but membership is durable. +- **API key abuse** — leaked keys = anyone can post. Mitigations: capability + topic scoping; rate limits per key; `last_used_at` + audit trail; revoke verb is fast. + +--- + +## Open questions + +1. Do existing `@group` semantics survive intact, or do we collapse `@group` and `#topic` into one primitive? (Answer favored: keep both — different axes.) +2. Should topics persist messages by default, or be opt-in? (Default: yes for `peer_type: "human"`-touched topics; configurable per topic for agent-only ones.) +3. Where does mesh-MCP discovery live in the topic model — per topic or per mesh? (Likely per mesh; mesh-MCP is infrastructure, not conversation.) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 9d9d63b..bbef677 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -42,6 +42,9 @@ import { meshService, meshSkill, meshStream, + meshTopic, + meshTopicMember, + meshTopicMessage, meshVaultEntry, meshTask, messageQueue, @@ -531,6 +534,254 @@ export async function leaveGroup( return groups; } +// --- Topics (v0.2.0) --- +// +// Conversational primitive within a mesh. Spec: +// .artifacts/specs/2026-05-02-v0.2.0-scope.md +// +// Mesh = trust boundary. Group = identity tag. Topic = conversation scope. +// Three orthogonal axes; topics complement (don't replace) groups. +// +// Routing: topic-tagged messages use targetSpec = "#". The drain +// query joins topic_member to filter delivery, so non-members never see +// the message. Topic-tagged messages are also persisted to topic_message +// so humans (and opting-in agents) can fetch history on reconnect. + +/** Create a topic in a mesh. Idempotent on (meshId, name). */ +export async function createTopic(args: { + meshId: string; + name: string; + description?: string; + visibility?: "public" | "private" | "dm"; + createdByMemberId?: string; +}): Promise<{ id: string; created: boolean }> { + const existing = await db + .select({ id: meshTopic.id }) + .from(meshTopic) + .where(and(eq(meshTopic.meshId, args.meshId), eq(meshTopic.name, args.name))); + if (existing[0]) return { id: existing[0].id, created: false }; + + const [row] = await db + .insert(meshTopic) + .values({ + meshId: args.meshId, + name: args.name, + description: args.description ?? null, + visibility: args.visibility ?? "public", + createdByMemberId: args.createdByMemberId ?? null, + }) + .returning({ id: meshTopic.id }); + if (!row) throw new Error("failed to create topic"); + return { id: row.id, created: true }; +} + +/** List topics in a mesh, with member counts. */ +export async function listTopics(meshId: string): Promise< + Array<{ + id: string; + name: string; + description: string | null; + visibility: "public" | "private" | "dm"; + memberCount: number; + createdAt: Date; + }> +> { + const rows = await db + .select({ + id: meshTopic.id, + name: meshTopic.name, + description: meshTopic.description, + visibility: meshTopic.visibility, + createdAt: meshTopic.createdAt, + memberCount: sql`(SELECT COUNT(*)::int FROM mesh.topic_member WHERE topic_id = ${meshTopic.id})`, + }) + .from(meshTopic) + .where(and(eq(meshTopic.meshId, meshId), isNull(meshTopic.archivedAt))) + .orderBy(asc(meshTopic.name)); + return rows; +} + +/** Resolve a topic by name within a mesh. */ +export async function findTopicByName( + meshId: string, + name: string, +): Promise<{ id: string; visibility: "public" | "private" | "dm" } | null> { + const [row] = await db + .select({ id: meshTopic.id, visibility: meshTopic.visibility }) + .from(meshTopic) + .where( + and( + eq(meshTopic.meshId, meshId), + eq(meshTopic.name, name), + isNull(meshTopic.archivedAt), + ), + ); + return row ?? null; +} + +/** Add a member to a topic. Idempotent. */ +export async function joinTopic(args: { + topicId: string; + memberId: string; + role?: "lead" | "member" | "observer"; +}): Promise { + await db + .insert(meshTopicMember) + .values({ + topicId: args.topicId, + memberId: args.memberId, + role: args.role ?? "member", + }) + .onConflictDoNothing(); +} + +/** Remove a member from a topic. */ +export async function leaveTopic(args: { + topicId: string; + memberId: string; +}): Promise { + await db + .delete(meshTopicMember) + .where( + and( + eq(meshTopicMember.topicId, args.topicId), + eq(meshTopicMember.memberId, args.memberId), + ), + ); +} + +/** List members of a topic with display names. */ +export async function topicMembers(topicId: string): Promise< + Array<{ + memberId: string; + pubkey: string; + displayName: string; + role: "lead" | "member" | "observer"; + joinedAt: Date; + lastReadAt: Date | null; + }> +> { + const rows = await db + .select({ + memberId: meshTopicMember.memberId, + pubkey: memberTable.peerPubkey, + displayName: memberTable.displayName, + role: meshTopicMember.role, + joinedAt: meshTopicMember.joinedAt, + lastReadAt: meshTopicMember.lastReadAt, + }) + .from(meshTopicMember) + .innerJoin(memberTable, eq(meshTopicMember.memberId, memberTable.id)) + .where(eq(meshTopicMember.topicId, topicId)) + .orderBy(asc(memberTable.displayName)); + return rows; +} + +/** Return all topic ids a member belongs to (used by message routing). */ +export async function getMemberTopicIds(memberId: string): Promise { + const rows = await db + .select({ id: meshTopicMember.topicId }) + .from(meshTopicMember) + .where(eq(meshTopicMember.memberId, memberId)); + return rows.map((r) => r.id); +} + +/** Append a topic message to persistent history. */ +export async function appendTopicMessage(args: { + topicId: string; + senderMemberId: string; + senderSessionPubkey?: string; + nonce: string; + ciphertext: string; +}): Promise { + const [row] = await db + .insert(meshTopicMessage) + .values({ + topicId: args.topicId, + senderMemberId: args.senderMemberId, + senderSessionPubkey: args.senderSessionPubkey ?? null, + nonce: args.nonce, + ciphertext: args.ciphertext, + }) + .returning({ id: meshTopicMessage.id }); + if (!row) throw new Error("failed to append topic message"); + return row.id; +} + +/** + * Fetch topic history for a member. Pagination via `before` cursor (id of + * an earlier message); pass null for the latest page. + */ +export async function topicHistory(args: { + topicId: string; + limit?: number; + beforeId?: string; +}): Promise< + Array<{ + id: string; + senderMemberId: string; + senderPubkey: string; + nonce: string; + ciphertext: string; + createdAt: Date; + }> +> { + const limit = Math.min(Math.max(args.limit ?? 50, 1), 200); + const beforeClause = args.beforeId + ? sql`AND tm.created_at < (SELECT created_at FROM mesh.topic_message WHERE id = ${args.beforeId})` + : sql``; + const result = await db.execute<{ + id: string; + sender_member_id: string; + sender_pubkey: string; + nonce: string; + ciphertext: string; + created_at: Date; + }>(sql` + SELECT tm.id, tm.sender_member_id, + COALESCE(tm.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey, + tm.nonce, tm.ciphertext, tm.created_at + FROM mesh.topic_message tm + JOIN mesh.member m ON m.id = tm.sender_member_id + WHERE tm.topic_id = ${args.topicId} + ${beforeClause} + ORDER BY tm.created_at DESC, tm.id DESC + LIMIT ${limit} + `); + const rows = (result.rows ?? result) as Array<{ + id: string; + sender_member_id: string; + sender_pubkey: string; + nonce: string; + ciphertext: string; + created_at: Date; + }>; + return rows.map((r) => ({ + id: r.id, + senderMemberId: r.sender_member_id, + senderPubkey: r.sender_pubkey, + nonce: r.nonce, + ciphertext: r.ciphertext, + createdAt: r.created_at instanceof Date ? r.created_at : new Date(r.created_at), + })); +} + +/** Update last_read_at for a member's topic subscription. */ +export async function markTopicRead(args: { + topicId: string; + memberId: string; +}): Promise { + await db + .update(meshTopicMember) + .set({ lastReadAt: new Date() }) + .where( + and( + eq(meshTopicMember.topicId, args.topicId), + eq(meshTopicMember.memberId, args.memberId), + ), + ); +} + // --- Shared state --- /** @@ -1563,7 +1814,7 @@ function deliverablePriorities(status: PeerStatus): Priority[] { */ export async function drainForMember( meshId: string, - _memberId: string, + memberId: string, memberPubkey: string, status: PeerStatus, sessionPubkey?: string, @@ -1615,6 +1866,17 @@ export async function drainForMember( groupTargets.map((t) => `'${t}'`).join(","), ); + // Topic membership targets (v0.2.0). targetSpec for topic-tagged + // messages is "#". A member receives a topic message iff + // they're in topic_member for that topic. We resolve memberships + // here and inline the list — same pattern as groups, no schema join + // in the hot path. + const topicIds = await getMemberTopicIds(memberId); + const topicTargetList = + topicIds.length > 0 + ? sql.raw(topicIds.map((id) => `'#${id}'`).join(",")) + : null; + // Atomic claim with SQL-side ordering. The CTE claims rows via // UPDATE...RETURNING; the outer SELECT re-orders by created_at // (with id as tiebreaker so equal-timestamp rows stay deterministic). @@ -1638,7 +1900,7 @@ export async function drainForMember( WHERE mesh_id = ${meshId} AND delivered_at IS NULL AND priority::text IN (${priorityList}) - AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})) + AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})${topicTargetList ? sql` OR target_spec IN (${topicTargetList})` : sql``}) ${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``} ORDER BY created_at ASC, id ASC FOR UPDATE SKIP LOCKED diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 88dfd19..b528ce2 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws"; import { and, eq, inArray, isNull, lt, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { invite as inviteTable, mesh, meshMember, messageQueue, presence, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; +import { invite as inviteTable, mesh, meshMember, messageQueue, presence, scheduledMessage as scheduledMessageTable, meshWebhook, peerState, meshTopic } from "@turbostarter/db/schema/mesh"; import { user } from "@turbostarter/db/schema/auth"; import { handleCliSync, type CliSyncRequest } from "./cli-sync"; import { generateId } from "@turbostarter/shared/utils"; @@ -84,6 +84,15 @@ import { listDbMeshServices, deleteService, getRunningServices, + createTopic, + listTopics, + findTopicByName, + joinTopic, + leaveTopic, + topicMembers, + topicHistory, + markTopicRead, + appendTopicMessage, } from "./broker"; import * as serviceManager from "./service-manager"; import { ensureBucket, meshBucketName, minioClient } from "./minio"; @@ -1495,6 +1504,26 @@ function sendError( } } +/** + * Resolve a topic identifier — accepts either a topic id directly OR a + * topic name within the given mesh. Returns the topic id, or null if no + * matching topic exists. Used by every topic_* WS handler so callers can + * reference topics by human-readable name without an extra round trip. + */ +async function resolveTopicId(meshId: string, idOrName: string): Promise { + // ULID-ish ids are 25-26 chars of base32; names are usually shorter and + // human-readable. Try as id first (cheap PK lookup), fall back to name. + if (idOrName.length >= 20 && /^[a-z0-9_-]+$/i.test(idOrName)) { + const byId = await db + .select({ id: meshTopic.id }) + .from(meshTopic) + .where(and(eq(meshTopic.id, idOrName), eq(meshTopic.meshId, meshId))); + if (byId[0]) return byId[0].id; + } + const byName = await findTopicByName(meshId, idOrName); + return byName?.id ?? null; +} + // --- Peer state persistence --- async function savePeerState(conn: PeerConn, memberId: string, meshId: string): Promise { @@ -1901,6 +1930,24 @@ async function handleSend( nonce: msg.nonce, ciphertext: msg.ciphertext, }); + + // Topic-tagged messages (targetSpec starts with `#`) get + // persisted to topic_message in addition to the ephemeral queue, so + // humans (and opting-in agents) can fetch history on reconnect. + // Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + if (msg.targetSpec.startsWith("#")) { + const topicId = msg.targetSpec.slice(1); + void appendTopicMessage({ + topicId, + senderMemberId: conn.memberId, + senderSessionPubkey: conn.sessionPubkey ?? undefined, + nonce: msg.nonce, + ciphertext: msg.ciphertext, + }).catch((e) => + log.warn("appendTopicMessage failed", { topic_id: topicId, err: String(e) }), + ); + } + void audit(conn.meshId, "message_sent", conn.memberId, conn.displayName, { targetSpec: msg.targetSpec, priority: msg.priority, @@ -2291,6 +2338,125 @@ function handleConnection(ws: WebSocket): void { }); break; } + + // ── Topics (v0.2.0) ───────────────────────────────────────── + case "topic_create": { + const tc = msg as Extract; + const result = await createTopic({ + meshId: conn.meshId, + name: tc.name, + description: tc.description, + visibility: tc.visibility, + createdByMemberId: conn.memberId, + }); + // Auto-subscribe the creator. + await joinTopic({ topicId: result.id, memberId: conn.memberId, role: "lead" }); + const resp: WSServerMessage = { + type: "topic_created", + topic: { + id: result.id, + name: tc.name, + visibility: tc.visibility ?? "public", + }, + created: result.created, + ...(_reqId ? { _reqId } : {}), + }; + conn.ws.send(JSON.stringify(resp)); + log.info("ws topic_create", { presence_id: presenceId, topic: tc.name, created: result.created }); + break; + } + + case "topic_list": { + const topics = await listTopics(conn.meshId); + const resp: WSServerMessage = { + type: "topic_list_response", + topics: topics.map((t) => ({ + id: t.id, + name: t.name, + description: t.description, + visibility: t.visibility, + memberCount: t.memberCount, + createdAt: t.createdAt.toISOString(), + })), + ...(_reqId ? { _reqId } : {}), + }; + conn.ws.send(JSON.stringify(resp)); + break; + } + + case "topic_join": { + const tj = msg as Extract; + const topicId = await resolveTopicId(conn.meshId, tj.topic); + if (!topicId) { sendError(ws, "topic_not_found", `topic "${tj.topic}" not found`, _reqId); break; } + await joinTopic({ topicId, memberId: conn.memberId, role: tj.role }); + log.info("ws topic_join", { presence_id: presenceId, topic: topicId }); + break; + } + + case "topic_leave": { + const tl = msg as Extract; + const topicId = await resolveTopicId(conn.meshId, tl.topic); + if (!topicId) { sendError(ws, "topic_not_found", `topic "${tl.topic}" not found`, _reqId); break; } + await leaveTopic({ topicId, memberId: conn.memberId }); + log.info("ws topic_leave", { presence_id: presenceId, topic: topicId }); + break; + } + + case "topic_members": { + const tm = msg as Extract; + const topicId = await resolveTopicId(conn.meshId, tm.topic); + if (!topicId) { sendError(ws, "topic_not_found", `topic "${tm.topic}" not found`, _reqId); break; } + const members = await topicMembers(topicId); + const resp: WSServerMessage = { + type: "topic_members_response", + topic: tm.topic, + members: members.map((m) => ({ + memberId: m.memberId, + pubkey: m.pubkey, + displayName: m.displayName, + role: m.role, + joinedAt: m.joinedAt.toISOString(), + lastReadAt: m.lastReadAt?.toISOString() ?? null, + })), + ...(_reqId ? { _reqId } : {}), + }; + conn.ws.send(JSON.stringify(resp)); + break; + } + + case "topic_history": { + const th = msg as Extract; + const topicId = await resolveTopicId(conn.meshId, th.topic); + if (!topicId) { sendError(ws, "topic_not_found", `topic "${th.topic}" not found`, _reqId); break; } + const history = await topicHistory({ + topicId, + limit: th.limit, + beforeId: th.beforeId, + }); + const resp: WSServerMessage = { + type: "topic_history_response", + topic: th.topic, + messages: history.map((h) => ({ + id: h.id, + senderPubkey: h.senderPubkey, + nonce: h.nonce, + ciphertext: h.ciphertext, + createdAt: h.createdAt.toISOString(), + })), + ...(_reqId ? { _reqId } : {}), + }; + conn.ws.send(JSON.stringify(resp)); + break; + } + + case "topic_mark_read": { + const tr = msg as Extract; + const topicId = await resolveTopicId(conn.meshId, tr.topic); + if (!topicId) { sendError(ws, "topic_not_found", `topic "${tr.topic}" not found`, _reqId); break; } + await markTopicRead({ topicId, memberId: conn.memberId }); + break; + } + case "set_state": { const ss = msg as Extract; // Look up the display name for attribution. diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 1c44e3d..e47ddf6 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -179,6 +179,107 @@ export interface WSLeaveGroupMessage { name: string; } +// ── Topics (v0.2.0) ───────────────────────────────────────────────── +// Topics complement groups: groups are identity tags, topics are +// conversation scopes. targetSpec for topic-tagged messages is +// "#". Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + +export interface WSTopicCreateMessage { + type: "topic_create"; + name: string; + description?: string; + visibility?: "public" | "private" | "dm"; + _reqId?: string; +} + +export interface WSTopicListMessage { + type: "topic_list"; + _reqId?: string; +} + +export interface WSTopicJoinMessage { + type: "topic_join"; + /** Topic id OR name. Server resolves. */ + topic: string; + role?: "lead" | "member" | "observer"; + _reqId?: string; +} + +export interface WSTopicLeaveMessage { + type: "topic_leave"; + topic: string; + _reqId?: string; +} + +export interface WSTopicMembersMessage { + type: "topic_members"; + topic: string; + _reqId?: string; +} + +export interface WSTopicHistoryMessage { + type: "topic_history"; + topic: string; + limit?: number; + beforeId?: string; + _reqId?: string; +} + +export interface WSTopicMarkReadMessage { + type: "topic_mark_read"; + topic: string; + _reqId?: string; +} + +// Server → client topic responses + +export interface WSTopicCreatedMessage { + type: "topic_created"; + topic: { id: string; name: string; visibility: "public" | "private" | "dm" }; + created: boolean; + _reqId?: string; +} + +export interface WSTopicListResponseMessage { + type: "topic_list_response"; + topics: Array<{ + id: string; + name: string; + description: string | null; + visibility: "public" | "private" | "dm"; + memberCount: number; + createdAt: string; + }>; + _reqId?: string; +} + +export interface WSTopicMembersResponseMessage { + type: "topic_members_response"; + topic: string; + members: Array<{ + memberId: string; + pubkey: string; + displayName: string; + role: "lead" | "member" | "observer"; + joinedAt: string; + lastReadAt: string | null; + }>; + _reqId?: string; +} + +export interface WSTopicHistoryResponseMessage { + type: "topic_history_response"; + topic: string; + messages: Array<{ + id: string; + senderPubkey: string; + nonce: string; + ciphertext: string; + createdAt: string; + }>; + _reqId?: string; +} + /** Client → broker: set a shared state key-value. */ export interface WSSetStateMessage { type: "set_state"; @@ -1145,6 +1246,13 @@ export type WSClientMessage = | WSSetProfileMessage | WSJoinGroupMessage | WSLeaveGroupMessage + | WSTopicCreateMessage + | WSTopicListMessage + | WSTopicJoinMessage + | WSTopicLeaveMessage + | WSTopicMembersMessage + | WSTopicHistoryMessage + | WSTopicMarkReadMessage | WSSetStateMessage | WSGetStateMessage | WSListStateMessage @@ -1313,6 +1421,10 @@ export type WSServerMessage = | WSPushMessage | WSAckMessage | WSPeersListMessage + | WSTopicCreatedMessage + | WSTopicListResponseMessage + | WSTopicMembersResponseMessage + | WSTopicHistoryResponseMessage | WSStateChangeMessage | WSStateResultMessage | WSStateListMessage diff --git a/apps/cli/skills/claudemesh/SKILL.md b/apps/cli/skills/claudemesh/SKILL.md index d9ac138..54b42fd 100644 --- a/apps/cli/skills/claudemesh/SKILL.md +++ b/apps/cli/skills/claudemesh/SKILL.md @@ -36,6 +36,31 @@ Every broker-touching verb runs through a policy gate before dispatch. The defau **Convention:** every operation is `claudemesh `. Legacy short forms (`send`, `peers`, `kick`, `remember`, ...) are aliases that keep working forever; prefer the resource form for new code. +### `topic` — conversation scope within a mesh (v0.2.0) + +A topic is a named conversation inside a mesh. Mesh = trust boundary. Group = identity tag. **Topic = what you're talking about.** Subscribers receive topic-tagged messages; non-subscribers don't. Topics also persist message history so humans (and opting-in agents) can fetch back-scroll on reconnect. + +```bash +claudemesh topic create deploys --description "deploy + on-call" +claudemesh topic create incident-2026-05-02 --visibility private +claudemesh topic list # all topics in mesh +claudemesh topic join deploys # subscribe (by name or id) +claudemesh topic join deploys --role lead # join as lead +claudemesh topic leave deploys +claudemesh topic members deploys # list subscribers +claudemesh topic history deploys --limit 50 # fetch back-scroll +claudemesh topic history deploys --before # paginate older +claudemesh topic read deploys # mark all as read + +# Send to a topic — same `send` verb, target starts with # +claudemesh send "#deploys" "rolling out 1.5.1 to staging" +``` + +When to use topics vs groups vs DM: +- **DM** (`send `) — 1:1, ephemeral. +- **Group** (`send "@frontend"`) — addresses everyone in a group; ephemeral; for coordinating teams. +- **Topic** (`send "#deploys"`) — durable conversation room; for ongoing work threads, incident channels, build-status feeds. + ### `peer` — read connected peers + admin (kick / ban / verify) ```bash diff --git a/apps/cli/src/cli/policy-classify.ts b/apps/cli/src/cli/policy-classify.ts index 33b9add..13501f9 100644 --- a/apps/cli/src/cli/policy-classify.ts +++ b/apps/cli/src/cli/policy-classify.ts @@ -85,6 +85,13 @@ export function classifyInvocation(command: string, positionals: string[]): Invo case "task": { return { resource: "task", verb: sub || "list", isWrite: isWrite(sub) }; } + case "topic": { + // topic verbs: create | list | join | leave | members | history | read + // writes: create, join, leave; reads: list, members, history, read + const verb = sub || "list"; + const writeVerbs = new Set(["create", "join", "leave"]); + return { resource: "topic", verb, isWrite: writeVerbs.has(verb) }; + } // Platform — sub is the verb. case "vector": case "graph": case "context": case "stream": diff --git a/apps/cli/src/commands/send.ts b/apps/cli/src/commands/send.ts index ac9888b..878de19 100644 --- a/apps/cli/src/commands/send.ts +++ b/apps/cli/src/commands/send.ts @@ -69,7 +69,19 @@ export async function runSend(flags: SendFlags, to: string, message: string): Pr // Cold path await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { let targetSpec = to; - if (!to.startsWith("@") && to !== "*" && !/^[0-9a-f]{64}$/i.test(to)) { + if (to.startsWith("#") && !/^#[0-9a-z_-]{20,}$/i.test(to)) { + // Topic by name → resolve to "#" via topicList. The broker + // wire format is "#"; users type "#" for ergonomics. + const name = to.slice(1); + const topics = await client.topicList(); + const match = topics.find((t) => t.name === name); + if (!match) { + const names = topics.map((t) => "#" + t.name).join(", "); + render.err(`Topic "${to}" not found.`, `topics: ${names || "(none)"}`); + process.exit(1); + } + targetSpec = "#" + match.id; + } else if (!to.startsWith("@") && !to.startsWith("#") && to !== "*" && !/^[0-9a-f]{64}$/i.test(to)) { const peers = await client.listPeers(); const match = peers.find( (p) => p.displayName.toLowerCase() === to.toLowerCase(), diff --git a/apps/cli/src/commands/topic.ts b/apps/cli/src/commands/topic.ts new file mode 100644 index 0000000..2cb8d50 --- /dev/null +++ b/apps/cli/src/commands/topic.ts @@ -0,0 +1,177 @@ +/** + * `claudemesh topic ` — conversation-scope primitive within a mesh. + * + * Topics complement groups: groups are identity tags ("@frontend"); topics + * are conversation scopes ("#deploys") with persistent history, + * subscription-based delivery, and per-topic state. + * + * Verbs: + * create [--description X] [--visibility public|private|dm] + * list + * join [--role lead|member|observer] + * leave + * members + * history [--limit N] [--before ] + * read (mark all as read) + * + * Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + */ + +import { withMesh } from "./connect.js"; +import { render } from "~/ui/render.js"; +import { bold, clay, dim, green } from "~/ui/styles.js"; +import { EXIT } from "~/constants/exit-codes.js"; + +export interface TopicFlags { + mesh?: string; + json?: boolean; + description?: string; + visibility?: "public" | "private" | "dm"; + role?: "lead" | "member" | "observer"; + limit?: number | string; + before?: string; +} + +export async function runTopicCreate(name: string, flags: TopicFlags): Promise { + if (!name) { + render.err("Usage: claudemesh topic create [--description X] [--visibility V]"); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + const result = await client.topicCreate({ + name, + description: flags.description, + visibility: flags.visibility, + }); + if (!result) { + render.err("topic create failed"); + return EXIT.INTERNAL_ERROR; + } + if (flags.json) { + console.log(JSON.stringify(result)); + return EXIT.SUCCESS; + } + if (result.created) { + render.ok("created", `${clay("#" + name)} ${dim(result.id.slice(0, 8))}`); + } else { + render.info(dim(`already exists: #${name} ${result.id.slice(0, 8)}`)); + } + return EXIT.SUCCESS; + }); +} + +export async function runTopicList(flags: TopicFlags): Promise { + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + const topics = await client.topicList(); + if (flags.json) { + console.log(JSON.stringify(topics, null, 2)); + return EXIT.SUCCESS; + } + if (topics.length === 0) { + render.info(dim("no topics in this mesh.")); + return EXIT.SUCCESS; + } + render.section(`topics (${topics.length})`); + for (const t of topics) { + const vis = t.visibility === "public" ? green(t.visibility) : dim(t.visibility); + process.stdout.write(` ${clay("#" + t.name)} ${vis} ${dim(`${t.memberCount} member${t.memberCount === 1 ? "" : "s"}`)}\n`); + if (t.description) process.stdout.write(` ${dim(t.description)}\n`); + } + return EXIT.SUCCESS; + }); +} + +export async function runTopicJoin(topic: string, flags: TopicFlags): Promise { + if (!topic) { + render.err("Usage: claudemesh topic join [--role lead|member|observer]"); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + await client.topicJoin(topic, flags.role); + if (flags.json) console.log(JSON.stringify({ joined: topic })); + else render.ok("joined", clay("#" + topic)); + return EXIT.SUCCESS; + }); +} + +export async function runTopicLeave(topic: string, flags: TopicFlags): Promise { + if (!topic) { + render.err("Usage: claudemesh topic leave "); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + await client.topicLeave(topic); + if (flags.json) console.log(JSON.stringify({ left: topic })); + else render.ok("left", clay("#" + topic)); + return EXIT.SUCCESS; + }); +} + +export async function runTopicMembers(topic: string, flags: TopicFlags): Promise { + if (!topic) { + render.err("Usage: claudemesh topic members "); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + const members = await client.topicMembers(topic); + if (flags.json) { + console.log(JSON.stringify(members, null, 2)); + return EXIT.SUCCESS; + } + if (members.length === 0) { + render.info(dim(`no members in ${clay("#" + topic)}.`)); + return EXIT.SUCCESS; + } + render.section(`${clay("#" + topic)} members (${members.length})`); + for (const m of members) { + process.stdout.write(` ${bold(m.displayName)} ${dim(m.role)} ${dim(m.pubkey.slice(0, 8))}\n`); + } + return EXIT.SUCCESS; + }); +} + +export async function runTopicHistory(topic: string, flags: TopicFlags): Promise { + if (!topic) { + render.err("Usage: claudemesh topic history [--limit N] [--before ]"); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + const limit = flags.limit ? Number(flags.limit) : undefined; + const messages = await client.topicHistory({ + topic, + limit, + beforeId: flags.before, + }); + if (flags.json) { + console.log(JSON.stringify(messages, null, 2)); + return EXIT.SUCCESS; + } + if (messages.length === 0) { + render.info(dim(`no messages in ${clay("#" + topic)}.`)); + return EXIT.SUCCESS; + } + // History returns newest-first; render oldest-first for chat UX. + const ordered = [...messages].reverse(); + render.section(`${clay("#" + topic)} history (${ordered.length})`); + for (const m of ordered) { + const t = new Date(m.createdAt).toLocaleString(); + process.stdout.write( + ` ${dim(t)} ${bold(m.senderPubkey.slice(0, 8))} ${dim("(encrypted, " + m.ciphertext.length + "b)")}\n`, + ); + } + return EXIT.SUCCESS; + }); +} + +export async function runTopicMarkRead(topic: string, flags: TopicFlags): Promise { + if (!topic) { + render.err("Usage: claudemesh topic read "); + return EXIT.INVALID_ARGS; + } + return await withMesh({ meshSlug: flags.mesh ?? null }, async (client) => { + await client.topicMarkRead(topic); + if (flags.json) console.log(JSON.stringify({ read: topic })); + else render.ok("marked read", clay("#" + topic)); + return EXIT.SUCCESS; + }); +} diff --git a/apps/cli/src/entrypoints/cli.ts b/apps/cli/src/entrypoints/cli.ts index 94792d4..ef5fc92 100644 --- a/apps/cli/src/entrypoints/cli.ts +++ b/apps/cli/src/entrypoints/cli.ts @@ -102,6 +102,16 @@ Profile / presence (resource form) claudemesh group join @ join a group (--role X) claudemesh group leave @ leave a group +Topic (conversation scope, v0.2.0) + claudemesh topic create create a topic [--description --visibility] + claudemesh topic list list topics in the mesh + claudemesh topic join subscribe (via name or id) + claudemesh topic leave unsubscribe + claudemesh topic members list topic subscribers + claudemesh topic history fetch message history [--limit --before] + claudemesh topic read mark all as read + claudemesh send "#topic" "msg" send to a topic + Schedule (resource form) claudemesh schedule msg one-shot or recurring (alias: remind) claudemesh schedule list list pending @@ -499,6 +509,30 @@ async function main(): Promise { break; } + // topic — conversational primitive within a mesh (v0.2.0) + case "topic": { + const sub = positionals[0]; + const f = { + mesh: flags.mesh as string, + json: !!flags.json, + description: flags.description as string, + visibility: flags.visibility as "public" | "private" | "dm" | undefined, + role: flags.role as "lead" | "member" | "observer" | undefined, + limit: flags.limit as string | undefined, + before: flags.before as string | undefined, + }; + const arg = positionals[1] ?? ""; + if (sub === "create") { const { runTopicCreate } = await import("~/commands/topic.js"); process.exit(await runTopicCreate(arg, f)); } + else if (sub === "list") { const { runTopicList } = await import("~/commands/topic.js"); process.exit(await runTopicList(f)); } + else if (sub === "join") { const { runTopicJoin } = await import("~/commands/topic.js"); process.exit(await runTopicJoin(arg, f)); } + else if (sub === "leave") { const { runTopicLeave } = await import("~/commands/topic.js"); process.exit(await runTopicLeave(arg, f)); } + else if (sub === "members") { const { runTopicMembers } = await import("~/commands/topic.js"); process.exit(await runTopicMembers(arg, f)); } + else if (sub === "history") { const { runTopicHistory } = await import("~/commands/topic.js"); process.exit(await runTopicHistory(arg, f)); } + else if (sub === "read") { const { runTopicMarkRead } = await import("~/commands/topic.js"); process.exit(await runTopicMarkRead(arg, f)); } + else { console.error("Usage: claudemesh topic "); process.exit(EXIT.INVALID_ARGS); } + break; + } + // task — extends broker-actions.ts (claim/complete) with list/create case "task": { const sub = positionals[0]; diff --git a/apps/cli/src/services/broker/ws-client.ts b/apps/cli/src/services/broker/ws-client.ts index 76a5d0f..a911552 100644 --- a/apps/cli/src/services/broker/ws-client.ts +++ b/apps/cli/src/services/broker/ws-client.ts @@ -161,6 +161,11 @@ export class BrokerClient { private grantFileAccessResolvers = new Map void; timer: NodeJS.Timeout }>(); private peerFileResponseResolvers = new Map void; timer: NodeJS.Timeout }>(); private peerDirResponseResolvers = new Map void; timer: NodeJS.Timeout }>(); + // ── Topics (v0.2.0) ── + private topicCreatedResolvers = new Map void; timer: NodeJS.Timeout }>(); + private topicListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private topicMembersResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + private topicHistoryResolvers = new Map) => void; timer: NodeJS.Timeout }>(); /** Directories from which this peer serves files. Default: [process.cwd()]. */ private sharedDirs: string[] = [process.cwd()]; private _serviceCatalog: Array<{ name: string; description: string; status: string; tools: Array<{ name: string; description: string; inputSchema: object }>; deployed_by: string }> = []; @@ -527,6 +532,121 @@ export class BrokerClient { this.ws.send(JSON.stringify({ type: "leave_group", name })); } + // --- Topics (v0.2.0) --- + // Conversation-scope primitive within a mesh. Spec: + // .artifacts/specs/2026-05-02-v0.2.0-scope.md + + async topicCreate(args: { + name: string; + description?: string; + visibility?: "public" | "private" | "dm"; + }): Promise<{ id: string; name: string; created: boolean } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.topicCreatedResolvers.set(reqId, { + resolve, + timer: setTimeout(() => { + if (this.topicCreatedResolvers.delete(reqId)) resolve(null); + }, 5_000), + }); + this.ws!.send( + JSON.stringify({ type: "topic_create", _reqId: reqId, ...args }), + ); + }); + } + + async topicList(): Promise< + Array<{ + id: string; + name: string; + description: string | null; + visibility: "public" | "private" | "dm"; + memberCount: number; + createdAt: string; + }> + > { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.topicListResolvers.set(reqId, { + resolve, + timer: setTimeout(() => { + if (this.topicListResolvers.delete(reqId)) resolve([]); + }, 5_000), + }); + this.ws!.send(JSON.stringify({ type: "topic_list", _reqId: reqId })); + }); + } + + async topicJoin(topic: string, role?: "lead" | "member" | "observer"): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_join", topic, role })); + } + + async topicLeave(topic: string): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_leave", topic })); + } + + async topicMembers(topic: string): Promise< + Array<{ + memberId: string; + pubkey: string; + displayName: string; + role: "lead" | "member" | "observer"; + joinedAt: string; + lastReadAt: string | null; + }> + > { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.topicMembersResolvers.set(reqId, { + resolve, + timer: setTimeout(() => { + if (this.topicMembersResolvers.delete(reqId)) resolve([]); + }, 5_000), + }); + this.ws!.send( + JSON.stringify({ type: "topic_members", _reqId: reqId, topic }), + ); + }); + } + + async topicHistory(args: { + topic: string; + limit?: number; + beforeId?: string; + }): Promise< + Array<{ + id: string; + senderPubkey: string; + nonce: string; + ciphertext: string; + createdAt: string; + }> + > { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.topicHistoryResolvers.set(reqId, { + resolve, + timer: setTimeout(() => { + if (this.topicHistoryResolvers.delete(reqId)) resolve([]); + }, 5_000), + }); + this.ws!.send( + JSON.stringify({ type: "topic_history", _reqId: reqId, ...args }), + ); + }); + } + + async topicMarkRead(topic: string): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "topic_mark_read", topic })); + } + // --- State --- /** Set a shared state value visible to all peers in the mesh. */ @@ -1694,6 +1814,28 @@ export class BrokerClient { this.resolveFromMap(this.listPeersResolvers, msgReqId, peers); return; } + // ── Topics (v0.2.0) ── + if (msg.type === "topic_created") { + const r = (msg.topic ?? {}) as { id: string; name: string }; + this.resolveFromMap(this.topicCreatedResolvers, msgReqId, { + id: r.id, + name: r.name, + created: !!msg.created, + }); + return; + } + if (msg.type === "topic_list_response") { + this.resolveFromMap(this.topicListResolvers, msgReqId, (msg.topics as any[]) ?? []); + return; + } + if (msg.type === "topic_members_response") { + this.resolveFromMap(this.topicMembersResolvers, msgReqId, (msg.members as any[]) ?? []); + return; + } + if (msg.type === "topic_history_response") { + this.resolveFromMap(this.topicHistoryResolvers, msgReqId, (msg.messages as any[]) ?? []); + return; + } if (msg.type === "push") { this._statsCounters.messagesIn++; const nonce = String(msg.nonce ?? ""); diff --git a/packages/db/migrations/0022_topics.sql b/packages/db/migrations/0022_topics.sql new file mode 100644 index 0000000..40c067d --- /dev/null +++ b/packages/db/migrations/0022_topics.sql @@ -0,0 +1,66 @@ +-- Topics — conversational primitive within a mesh (v0.2.0). +-- +-- Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md +-- +-- Mesh = trust boundary. Group = identity tag. Topic = conversation scope. +-- Three orthogonal axes; topics complement (don't replace) groups. +-- +-- Three new tables in the `mesh` pg-schema: +-- * mesh.topic — named topic per mesh (unique on mesh_id, name) +-- * mesh.topic_member — per-member subscriptions, with last_read_at +-- * mesh.topic_message — persistent encrypted history (used for human- +-- touched topics; agent-only topics may opt out) +-- +-- Two new pg enums: +-- * mesh.topic_visibility = public | private | dm +-- * mesh.topic_member_role = lead | member | observer +-- +-- Additive — no breaking changes to existing tables. Safe to deploy before +-- CLI/broker code knows about topics; the routing layer falls back to the +-- existing peer/group/* targeting until topic-tagged messages arrive. + +CREATE TYPE "mesh"."topic_visibility" AS ENUM ('public', 'private', 'dm'); +CREATE TYPE "mesh"."topic_member_role" AS ENUM ('lead', 'member', 'observer'); + +CREATE TABLE IF NOT EXISTS "mesh"."topic" ( + "id" text PRIMARY KEY NOT NULL, + "mesh_id" text NOT NULL REFERENCES "mesh"."mesh"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "name" text NOT NULL, + "description" text, + "visibility" "mesh"."topic_visibility" NOT NULL DEFAULT 'public', + "created_by_member_id" text REFERENCES "mesh"."member"("id") ON DELETE SET NULL ON UPDATE CASCADE, + "created_at" timestamp DEFAULT now() NOT NULL, + "archived_at" timestamp +); + +CREATE UNIQUE INDEX IF NOT EXISTS "topic_mesh_name_unique" + ON "mesh"."topic" ("mesh_id", "name"); + +CREATE TABLE IF NOT EXISTS "mesh"."topic_member" ( + "topic_id" text NOT NULL REFERENCES "mesh"."topic"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "role" "mesh"."topic_member_role" NOT NULL DEFAULT 'member', + "joined_at" timestamp DEFAULT now() NOT NULL, + "last_read_at" timestamp +); + +CREATE UNIQUE INDEX IF NOT EXISTS "topic_member_unique" + ON "mesh"."topic_member" ("topic_id", "member_id"); + +CREATE INDEX IF NOT EXISTS "topic_member_by_member" + ON "mesh"."topic_member" ("member_id"); + +CREATE TABLE IF NOT EXISTS "mesh"."topic_message" ( + "id" text PRIMARY KEY NOT NULL, + "topic_id" text NOT NULL REFERENCES "mesh"."topic"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "sender_member_id" text NOT NULL REFERENCES "mesh"."member"("id") ON DELETE CASCADE ON UPDATE CASCADE, + "sender_session_pubkey" text, + "nonce" text NOT NULL, + "ciphertext" text NOT NULL, + "created_at" timestamp DEFAULT now() NOT NULL +); + +-- Composite index for the common access pattern: load topic history +-- ordered by time. Drives the web chat panel's infinite-scroll fetch. +CREATE INDEX IF NOT EXISTS "topic_message_by_topic_time" + ON "mesh"."topic_message" ("topic_id", "created_at"); diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index b9c188d..301691a 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -58,11 +58,10 @@ export const presenceStatusEnum = meshSchema.enum("presence_status", [ "dnd", ]); -export const presenceStatusSourceEnum = meshSchema.enum("presence_status_source", [ - "hook", - "manual", - "jsonl", -]); +export const presenceStatusSourceEnum = meshSchema.enum( + "presence_status_source", + ["hook", "manual", "jsonl"], +); export const messagePriorityEnum = meshSchema.enum("message_priority", [ "now", @@ -120,12 +119,19 @@ export const mesh = meshSchema.table("mesh", { * Per-mesh policy controlling which profile fields members can edit * about themselves. Admins can always edit anyone's profile regardless. */ - selfEditable: jsonb().$type<{ - displayName: boolean; - roleTag: boolean; - groups: boolean; - messageMode: boolean; - }>().default({ displayName: true, roleTag: true, groups: true, messageMode: true }), + selfEditable: jsonb() + .$type<{ + displayName: boolean; + roleTag: boolean; + groups: boolean; + messageMode: boolean; + }>() + .default({ + displayName: true, + roleTag: true, + groups: true, + messageMode: true, + }), createdAt: timestamp().defaultNow().notNull(), archivedAt: timestamp(), }); @@ -141,43 +147,46 @@ export const mesh = meshSchema.table("mesh", { * one of the two on collision. Unique TS name + short DB name is the * cleanest trade-off. */ -export const meshMember = meshSchema.table("member", { - id: text().primaryKey().notNull().$defaultFn(generateId), - meshId: text() - .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) - .notNull(), - userId: text().references(() => user.id, { - onDelete: "set null", - onUpdate: "cascade", - }), - peerPubkey: text().notNull(), - displayName: text().notNull(), - role: meshRoleEnum().notNull().default("member"), - /** Free-text role label visible to peers (not to be confused with `role` which is the permission enum). */ - roleTag: text(), - /** Persistent group memberships set via dashboard or CLI profile command. */ - defaultGroups: jsonb().$type>().default([]), - /** Delivery preference: push (real-time), inbox (held), off (manual poll). */ - messageMode: text().default("push"), - /** Links this mesh member to a dashboard OAuth user (Payload CMS user.id). */ - dashboardUserId: text(), - joinedAt: timestamp().defaultNow().notNull(), - lastSeenAt: timestamp(), - revokedAt: timestamp(), - /** - * Per-peer capability grants — which peer pubkeys can send this member - * which kinds of messages. Empty object = use defaults (read + dm + - * broadcast + state-read). Empty array for a specific pubkey = blocked. - * See .artifacts/specs/2026-04-15-per-peer-capabilities.md. - */ - peerGrants: jsonb() - .$type>() - .notNull() - .default({}), -}, (table) => [ - index("member_dashboard_user_idx").on(table.dashboardUserId), - index("member_peer_grants_gin_idx").using("gin", table.peerGrants), -]); +export const meshMember = meshSchema.table( + "member", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + userId: text().references(() => user.id, { + onDelete: "set null", + onUpdate: "cascade", + }), + peerPubkey: text().notNull(), + displayName: text().notNull(), + role: meshRoleEnum().notNull().default("member"), + /** Free-text role label visible to peers (not to be confused with `role` which is the permission enum). */ + roleTag: text(), + /** Persistent group memberships set via dashboard or CLI profile command. */ + defaultGroups: jsonb() + .$type<{ name: string; role?: string }[]>() + .default([]), + /** Delivery preference: push (real-time), inbox (held), off (manual poll). */ + messageMode: text().default("push"), + /** Links this mesh member to a dashboard OAuth user (Payload CMS user.id). */ + dashboardUserId: text(), + joinedAt: timestamp().defaultNow().notNull(), + lastSeenAt: timestamp(), + revokedAt: timestamp(), + /** + * Per-peer capability grants — which peer pubkeys can send this member + * which kinds of messages. Empty object = use defaults (read + dm + + * broadcast + state-read). Empty array for a specific pubkey = blocked. + * See .artifacts/specs/2026-04-15-per-peer-capabilities.md. + */ + peerGrants: jsonb().$type>().notNull().default({}), + }, + (table) => [ + index("member_dashboard_user_idx").on(table.dashboardUserId), + index("member_peer_grants_gin_idx").using("gin", table.peerGrants), + ], +); /** * Invite tokens used to join a mesh via shareable URL. @@ -206,12 +215,14 @@ export const invite = meshSchema.table("invite", { usedCount: integer().notNull().default(0), role: meshRoleEnum().notNull().default("member"), /** Pre-configured profile values applied to new members on join. */ - preset: jsonb().$type<{ - displayName?: string; - roleTag?: string; - groups?: Array<{ name: string; role?: string }>; - messageMode?: string; - }>().default({}), + preset: jsonb() + .$type<{ + displayName?: string; + roleTag?: string; + groups?: { name: string; role?: string }[]; + messageMode?: string; + }>() + .default({}), expiresAt: timestamp().notNull(), createdBy: text() .references(() => user.id, { onDelete: "cascade", onUpdate: "cascade" }) @@ -239,25 +250,29 @@ export const invite = meshSchema.table("invite", { * `code` references an underlying mesh.invite row that will be minted * on send; when the recipient lands on /i/{code} they claim the real invite. */ -export const pendingInvite = meshSchema.table("pending_invite", { - id: text().primaryKey().notNull().$defaultFn(generateId), - meshId: text() - .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) - .notNull(), - email: text().notNull(), - /** The short code of the underlying `mesh.invite.code` row this email links to. */ - code: text().notNull(), - sentAt: timestamp().defaultNow().notNull(), - acceptedAt: timestamp(), - revokedAt: timestamp(), - createdBy: text() - .references(() => user.id, { onDelete: "cascade", onUpdate: "cascade" }) - .notNull(), - createdAt: timestamp().defaultNow().notNull(), -}, (table) => [ - index("pending_invite_email_idx").on(table.email), - index("pending_invite_mesh_idx").on(table.meshId), -]); +export const pendingInvite = meshSchema.table( + "pending_invite", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + email: text().notNull(), + /** The short code of the underlying `mesh.invite.code` row this email links to. */ + code: text().notNull(), + sentAt: timestamp().defaultNow().notNull(), + acceptedAt: timestamp(), + revokedAt: timestamp(), + createdBy: text() + .references(() => user.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + createdAt: timestamp().defaultNow().notNull(), + }, + (table) => [ + index("pending_invite_email_idx").on(table.email), + index("pending_invite_mesh_idx").on(table.meshId), + ], +); /** * Signed, hash-chained audit log. NEVER stores message content — every @@ -294,7 +309,10 @@ export const auditLog = meshSchema.table("audit_log", { export const presence = meshSchema.table("presence", { id: text().primaryKey().notNull().$defaultFn(generateId), memberId: text() - .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) .notNull(), sessionId: text().notNull(), sessionPubkey: text(), @@ -305,7 +323,7 @@ export const presence = meshSchema.table("presence", { statusSource: presenceStatusSourceEnum().notNull().default("jsonl"), statusUpdatedAt: timestamp().defaultNow().notNull(), summary: text(), - groups: jsonb().$type>().default([]), + groups: jsonb().$type<{ name: string; role?: string }[]>().default([]), connectedAt: timestamp().defaultNow().notNull(), lastPingAt: timestamp().defaultNow().notNull(), disconnectedAt: timestamp(), @@ -326,7 +344,10 @@ export const messageQueue = meshSchema.table("message_queue", { .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) .notNull(), senderMemberId: text() - .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) .notNull(), senderSessionPubkey: text(), targetSpec: text().notNull(), @@ -474,7 +495,10 @@ export const meshContext = meshSchema.table( meshId: text() .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) .notNull(), - memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }), + memberId: text().references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }), presenceId: text().references(() => presence.id, { onDelete: "cascade" }), peerName: text(), summary: text().notNull(), @@ -580,11 +604,16 @@ export const meshWebhook = meshSchema.table( secret: text().notNull(), active: boolean().notNull().default(true), createdBy: text() - .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) .notNull(), createdAt: timestamp().defaultNow().notNull(), }, - (table) => [uniqueIndex("webhook_mesh_name_idx").on(table.meshId, table.name)], + (table) => [ + uniqueIndex("webhook_mesh_name_idx").on(table.meshId, table.name), + ], ); export const meshService = meshSchema.table( @@ -618,7 +647,9 @@ export const meshService = meshSchema.table( createdAt: timestamp().defaultNow().notNull(), updatedAt: timestamp().defaultNow().notNull(), }, - (table) => [uniqueIndex("service_mesh_name_idx").on(table.meshId, table.name)], + (table) => [ + uniqueIndex("service_mesh_name_idx").on(table.meshId, table.name), + ], ); export const meshVaultEntry = meshSchema.table( @@ -641,7 +672,13 @@ export const meshVaultEntry = meshSchema.table( createdAt: timestamp().defaultNow().notNull(), updatedAt: timestamp().defaultNow().notNull(), }, - (table) => [uniqueIndex("vault_entry_mesh_member_key_idx").on(table.meshId, table.memberId, table.key)], + (table) => [ + uniqueIndex("vault_entry_mesh_member_key_idx").on( + table.meshId, + table.memberId, + table.key, + ), + ], ); export const meshWebhookRelations = relations(meshWebhook, ({ one }) => ({ @@ -668,7 +705,10 @@ export const scheduledMessage = meshSchema.table("scheduled_message", { /** Nullable — the presence that created it may be gone after a restart. */ presenceId: text(), memberId: text() - .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) .notNull(), to: text().notNull(), message: text().notNull(), @@ -683,19 +723,24 @@ export const scheduledMessage = meshSchema.table("scheduled_message", { createdAt: timestamp().defaultNow().notNull(), }); -export const scheduledMessageRelations = relations(scheduledMessage, ({ one }) => ({ - mesh: one(mesh, { - fields: [scheduledMessage.meshId], - references: [mesh.id], +export const scheduledMessageRelations = relations( + scheduledMessage, + ({ one }) => ({ + mesh: one(mesh, { + fields: [scheduledMessage.meshId], + references: [mesh.id], + }), + member: one(meshMember, { + fields: [scheduledMessage.memberId], + references: [meshMember.id], + }), }), - member: one(meshMember, { - fields: [scheduledMessage.memberId], - references: [meshMember.id], - }), -})); +); -export const selectScheduledMessageSchema = createSelectSchema(scheduledMessage); -export const insertScheduledMessageSchema = createInsertSchema(scheduledMessage); +export const selectScheduledMessageSchema = + createSelectSchema(scheduledMessage); +export const insertScheduledMessageSchema = + createInsertSchema(scheduledMessage); export type SelectScheduledMessage = typeof scheduledMessage.$inferSelect; export type InsertScheduledMessage = typeof scheduledMessage.$inferInsert; @@ -736,40 +781,44 @@ export const meshMemberRelations = relations(meshMember, ({ one, many }) => ({ * * Explicit rows override these defaults (allow or deny). */ -export const meshPermission = meshSchema.table("permission", { - id: text().primaryKey().notNull().$defaultFn(generateId), - meshId: text() - .references(() => mesh.id, { onDelete: "cascade" }) - .notNull(), - memberId: text() - .references(() => meshMember.id, { onDelete: "cascade" }) - .notNull(), - /** Invite other users to the mesh. */ - canInvite: boolean().notNull().default(false), - /** Deploy/undeploy MCP services. */ - canDeployMcp: boolean().notNull().default(false), - /** Upload/delete shared files. */ - canManageFiles: boolean().notNull().default(false), - /** Read/write vault secrets. */ - canManageVault: boolean().notNull().default(false), - /** Create/manage URL watches. */ - canManageWatches: boolean().notNull().default(false), - /** Create/manage webhooks. */ - canManageWebhooks: boolean().notNull().default(false), - /** Write shared state (read is always allowed). */ - canWriteState: boolean().notNull().default(true), - /** Send messages to peers. */ - canSend: boolean().notNull().default(true), - /** Use deployed MCP tools. */ - canUseTools: boolean().notNull().default(true), - /** Delete the mesh entirely (owner only). */ - canDeleteMesh: boolean().notNull().default(false), - /** Manage other members' permissions. */ - canManagePermissions: boolean().notNull().default(false), - updatedAt: timestamp().defaultNow().notNull(), -}, (table) => [ - uniqueIndex("permission_member_mesh_idx").on(table.meshId, table.memberId), -]); +export const meshPermission = meshSchema.table( + "permission", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade" }) + .notNull(), + memberId: text() + .references(() => meshMember.id, { onDelete: "cascade" }) + .notNull(), + /** Invite other users to the mesh. */ + canInvite: boolean().notNull().default(false), + /** Deploy/undeploy MCP services. */ + canDeployMcp: boolean().notNull().default(false), + /** Upload/delete shared files. */ + canManageFiles: boolean().notNull().default(false), + /** Read/write vault secrets. */ + canManageVault: boolean().notNull().default(false), + /** Create/manage URL watches. */ + canManageWatches: boolean().notNull().default(false), + /** Create/manage webhooks. */ + canManageWebhooks: boolean().notNull().default(false), + /** Write shared state (read is always allowed). */ + canWriteState: boolean().notNull().default(true), + /** Send messages to peers. */ + canSend: boolean().notNull().default(true), + /** Use deployed MCP tools. */ + canUseTools: boolean().notNull().default(true), + /** Delete the mesh entirely (owner only). */ + canDeleteMesh: boolean().notNull().default(false), + /** Manage other members' permissions. */ + canManagePermissions: boolean().notNull().default(false), + updatedAt: timestamp().defaultNow().notNull(), + }, + (table) => [ + uniqueIndex("permission_member_mesh_idx").on(table.meshId, table.memberId), + ], +); export const meshPermissionRelations = relations(meshPermission, ({ one }) => ({ mesh: one(mesh, { @@ -792,22 +841,43 @@ export type InsertMeshPermission = typeof meshPermission.$inferInsert; */ export const DEFAULT_PERMISSIONS = { owner: { - canInvite: true, canDeployMcp: true, canManageFiles: true, - canManageVault: true, canManageWatches: true, canManageWebhooks: true, - canWriteState: true, canSend: true, canUseTools: true, - canDeleteMesh: true, canManagePermissions: true, + canInvite: true, + canDeployMcp: true, + canManageFiles: true, + canManageVault: true, + canManageWatches: true, + canManageWebhooks: true, + canWriteState: true, + canSend: true, + canUseTools: true, + canDeleteMesh: true, + canManagePermissions: true, }, admin: { - canInvite: true, canDeployMcp: true, canManageFiles: true, - canManageVault: true, canManageWatches: true, canManageWebhooks: true, - canWriteState: true, canSend: true, canUseTools: true, - canDeleteMesh: false, canManagePermissions: true, + canInvite: true, + canDeployMcp: true, + canManageFiles: true, + canManageVault: true, + canManageWatches: true, + canManageWebhooks: true, + canWriteState: true, + canSend: true, + canUseTools: true, + canDeleteMesh: false, + canManagePermissions: true, }, member: { - canInvite: false, canDeployMcp: false, canManageFiles: false, - canManageVault: false, canManageWatches: false, canManageWebhooks: false, - canWriteState: true, canSend: true, canUseTools: true, - canDeleteMesh: false, canManagePermissions: false, + canInvite: false, + canDeployMcp: false, + canManageFiles: false, + canManageVault: false, + canManageWatches: false, + canManageWebhooks: false, + canWriteState: true, + canSend: true, + canUseTools: true, + canDeleteMesh: false, + canManagePermissions: false, }, } as const; @@ -844,7 +914,10 @@ export const inviteRelations = relations(invite, ({ one }) => ({ export const pendingInviteRelations = relations(pendingInvite, ({ one }) => ({ mesh: one(mesh, { fields: [pendingInvite.meshId], references: [mesh.id] }), - inviter: one(user, { fields: [pendingInvite.createdBy], references: [user.id] }), + inviter: one(user, { + fields: [pendingInvite.createdBy], + references: [user.id], + }), })); export const auditLogRelations = relations(auditLog, ({ one }) => ({ @@ -997,14 +1070,31 @@ export const peerState = meshSchema.table( .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) .notNull(), memberId: text() - .references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }) + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) .notNull(), - groups: jsonb().$type>().default([]), - profile: jsonb().$type<{ avatar?: string; title?: string; bio?: string; capabilities?: string[] }>().default({}), + groups: jsonb().$type<{ name: string; role?: string }[]>().default([]), + profile: jsonb() + .$type<{ + avatar?: string; + title?: string; + bio?: string; + capabilities?: string[]; + }>() + .default({}), visible: boolean().notNull().default(true), lastSummary: text(), lastDisplayName: text(), - cumulativeStats: jsonb().$type<{ messagesIn: number; messagesOut: number; toolCalls: number; errors: number }>().default({ messagesIn: 0, messagesOut: 0, toolCalls: 0, errors: 0 }), + cumulativeStats: jsonb() + .$type<{ + messagesIn: number; + messagesOut: number; + toolCalls: number; + errors: number; + }>() + .default({ messagesIn: 0, messagesOut: 0, toolCalls: 0, errors: 0 }), lastSeenAt: timestamp(), createdAt: timestamp().defaultNow().notNull(), updatedAt: timestamp().defaultNow().notNull(), @@ -1052,8 +1142,14 @@ export type InsertMeshSkill = typeof meshSkill.$inferInsert; export const meshServiceRelations = relations(meshService, ({ one }) => ({ mesh: one(mesh, { fields: [meshService.meshId], references: [mesh.id] }), - sourceFile: one(meshFile, { fields: [meshService.sourceFileId], references: [meshFile.id] }), - deployer: one(meshMember, { fields: [meshService.deployedBy], references: [meshMember.id] }), + sourceFile: one(meshFile, { + fields: [meshService.sourceFileId], + references: [meshFile.id], + }), + deployer: one(meshMember, { + fields: [meshService.deployedBy], + references: [meshMember.id], + }), })); export const selectMeshServiceSchema = createSelectSchema(meshService); @@ -1063,7 +1159,10 @@ export type InsertMeshService = typeof meshService.$inferInsert; export const meshVaultEntryRelations = relations(meshVaultEntry, ({ one }) => ({ mesh: one(mesh, { fields: [meshVaultEntry.meshId], references: [mesh.id] }), - member: one(meshMember, { fields: [meshVaultEntry.memberId], references: [meshMember.id] }), + member: one(meshMember, { + fields: [meshVaultEntry.memberId], + references: [meshMember.id], + }), })); export const selectMeshVaultEntrySchema = createSelectSchema(meshVaultEntry); @@ -1134,31 +1233,35 @@ export const deviceCodeStatusEnum = meshSchema.enum("device_code_status", [ * Device codes for CLI → browser → CLI OAuth flow. * CLI creates a code, browser approves it, CLI polls until approved. */ -export const deviceCode = meshSchema.table("device_code", { - id: text().primaryKey().notNull().$defaultFn(generateId), - /** Random 16-char code used by CLI to poll (secret, never shown to user). */ - deviceCode: text().notNull().unique(), - /** Human-readable code shown in both terminal and browser for visual confirmation. */ - userCode: text().notNull(), - /** URL-safe session identifier (clm_sess_..., 32 chars). Not secret — appears in browser URL. */ - sessionId: text().notNull().unique(), - status: deviceCodeStatusEnum().notNull().default("pending"), - /** Filled on approve — the authenticated user. */ - userId: text().references(() => user.id, { onDelete: "cascade" }), - /** Device info from CLI request. */ - hostname: text(), - platform: text(), - arch: text(), - ipAddress: text(), - /** Signed JWT session token — filled on approve. */ - sessionToken: text(), - createdAt: timestamp().defaultNow().notNull(), - approvedAt: timestamp(), - expiresAt: timestamp().notNull(), -}, (table) => [ - index("device_code_status_idx").on(table.status), - index("device_code_user_code_idx").on(table.userCode), -]); +export const deviceCode = meshSchema.table( + "device_code", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + /** Random 16-char code used by CLI to poll (secret, never shown to user). */ + deviceCode: text().notNull().unique(), + /** Human-readable code shown in both terminal and browser for visual confirmation. */ + userCode: text().notNull(), + /** URL-safe session identifier (clm_sess_..., 32 chars). Not secret — appears in browser URL. */ + sessionId: text().notNull().unique(), + status: deviceCodeStatusEnum().notNull().default("pending"), + /** Filled on approve — the authenticated user. */ + userId: text().references(() => user.id, { onDelete: "cascade" }), + /** Device info from CLI request. */ + hostname: text(), + platform: text(), + arch: text(), + ipAddress: text(), + /** Signed JWT session token — filled on approve. */ + sessionToken: text(), + createdAt: timestamp().defaultNow().notNull(), + approvedAt: timestamp(), + expiresAt: timestamp().notNull(), + }, + (table) => [ + index("device_code_status_idx").on(table.status), + index("device_code_user_code_idx").on(table.userCode), + ], +); export const deviceCodeRelations = relations(deviceCode, ({ one }) => ({ user: one(user, { @@ -1176,26 +1279,30 @@ export type InsertDeviceCode = typeof deviceCode.$inferInsert; * Persistent CLI session records — one per authenticated device. * Enables dashboard "Signed in on N devices" view and per-device revocation. */ -export const cliSession = meshSchema.table("cli_session", { - id: text().primaryKey().notNull().$defaultFn(generateId), - userId: text() - .references(() => user.id, { onDelete: "cascade" }) - .notNull(), - /** Which device-code auth created this session. */ - deviceCodeId: text().references(() => deviceCode.id), - hostname: text(), - platform: text(), - arch: text(), - /** SHA-256 hash of the JWT for revocation lookup. */ - tokenHash: text().notNull(), - lastSeenAt: timestamp().defaultNow(), - createdAt: timestamp().defaultNow().notNull(), - /** NULL until user revokes from dashboard. */ - revokedAt: timestamp(), -}, (table) => [ - index("cli_session_user_idx").on(table.userId), - index("cli_session_token_hash_idx").on(table.tokenHash), -]); +export const cliSession = meshSchema.table( + "cli_session", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + userId: text() + .references(() => user.id, { onDelete: "cascade" }) + .notNull(), + /** Which device-code auth created this session. */ + deviceCodeId: text().references(() => deviceCode.id), + hostname: text(), + platform: text(), + arch: text(), + /** SHA-256 hash of the JWT for revocation lookup. */ + tokenHash: text().notNull(), + lastSeenAt: timestamp().defaultNow(), + createdAt: timestamp().defaultNow().notNull(), + /** NULL until user revokes from dashboard. */ + revokedAt: timestamp(), + }, + (table) => [ + index("cli_session_user_idx").on(table.userId), + index("cli_session_token_hash_idx").on(table.tokenHash), + ], +); export const cliSessionRelations = relations(cliSession, ({ one }) => ({ user: one(user, { @@ -1212,3 +1319,167 @@ export const selectCliSessionSchema = createSelectSchema(cliSession); export const insertCliSessionSchema = createInsertSchema(cliSession); export type SelectCliSession = typeof cliSession.$inferSelect; export type InsertCliSession = typeof cliSession.$inferInsert; + +/* ──────────────────────────────────────────────────────────────────────── + * Topics (v0.2.0) — conversational primitive within a mesh. + * + * Mesh = trust boundary. Group = identity tag. Topic = conversation scope. + * Three orthogonal axes; topics complement (don't replace) groups. + * + * Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md + * ──────────────────────────────────────────────────────────────────────── */ + +export const topicVisibilityEnum = meshSchema.enum("topic_visibility", [ + "public", // any mesh member can join + "private", // invite-only + "dm", // 1:1, autocreated when two peers DM +]); + +export const topicMemberRoleEnum = meshSchema.enum("topic_member_role", [ + "lead", + "member", + "observer", +]); + +/** + * A topic is a named conversation scope within a mesh. Messages, state, + * memory, and files can be topic-scoped. Membership controls delivery + * (broker filters topic-tagged messages by topic_member rows). + */ +export const meshTopic = meshSchema.table( + "topic", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + name: text().notNull(), // unique within mesh; e.g. "deploys" + description: text(), + visibility: topicVisibilityEnum().notNull().default("public"), + createdByMemberId: text().references(() => meshMember.id, { + onDelete: "set null", + onUpdate: "cascade", + }), + createdAt: timestamp().defaultNow().notNull(), + archivedAt: timestamp(), + }, + (t) => [uniqueIndex("topic_mesh_name_unique").on(t.meshId, t.name)], +); + +/** + * Per-member topic membership. last_read_at drives unread counts in the + * web chat UI; role is advisory (lead/member/observer) like meshGroup. + */ +export const meshTopicMember = meshSchema.table( + "topic_member", + { + topicId: text() + .references(() => meshTopic.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + memberId: text() + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + role: topicMemberRoleEnum().notNull().default("member"), + joinedAt: timestamp().defaultNow().notNull(), + lastReadAt: timestamp(), + }, + (t) => [ + uniqueIndex("topic_member_unique").on(t.topicId, t.memberId), + index("topic_member_by_member").on(t.memberId), + ], +); + +/** + * Topic-scoped persistent message history. Direct messages (DMs) stay + * ephemeral via message_queue by design — this table only persists + * messages addressed to a topic, so humans (and agents that opt in) can + * see history when they reconnect. + * + * Ciphertext is encrypted to the topic's symmetric key (held by every + * topic member). Server cannot read content; it can only filter delivery + * by topic membership. + */ +export const meshTopicMessage = meshSchema.table( + "topic_message", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + topicId: text() + .references(() => meshTopic.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + senderMemberId: text() + .references(() => meshMember.id, { + onDelete: "cascade", + onUpdate: "cascade", + }) + .notNull(), + senderSessionPubkey: text(), + nonce: text().notNull(), + ciphertext: text().notNull(), + createdAt: timestamp().defaultNow().notNull(), + }, + (t) => [index("topic_message_by_topic_time").on(t.topicId, t.createdAt)], +); + +export const meshTopicRelations = relations(meshTopic, ({ one, many }) => ({ + mesh: one(mesh, { fields: [meshTopic.meshId], references: [mesh.id] }), + createdBy: one(meshMember, { + fields: [meshTopic.createdByMemberId], + references: [meshMember.id], + }), + members: many(meshTopicMember), + messages: many(meshTopicMessage), +})); + +export const meshTopicMemberRelations = relations( + meshTopicMember, + ({ one }) => ({ + topic: one(meshTopic, { + fields: [meshTopicMember.topicId], + references: [meshTopic.id], + }), + member: one(meshMember, { + fields: [meshTopicMember.memberId], + references: [meshMember.id], + }), + }), +); + +export const meshTopicMessageRelations = relations( + meshTopicMessage, + ({ one }) => ({ + topic: one(meshTopic, { + fields: [meshTopicMessage.topicId], + references: [meshTopic.id], + }), + sender: one(meshMember, { + fields: [meshTopicMessage.senderMemberId], + references: [meshMember.id], + }), + }), +); + +export const selectMeshTopicSchema = createSelectSchema(meshTopic); +export const insertMeshTopicSchema = createInsertSchema(meshTopic); +export type SelectMeshTopic = typeof meshTopic.$inferSelect; +export type InsertMeshTopic = typeof meshTopic.$inferInsert; + +export const selectMeshTopicMemberSchema = createSelectSchema(meshTopicMember); +export const insertMeshTopicMemberSchema = createInsertSchema(meshTopicMember); +export type SelectMeshTopicMember = typeof meshTopicMember.$inferSelect; +export type InsertMeshTopicMember = typeof meshTopicMember.$inferInsert; + +export const selectMeshTopicMessageSchema = + createSelectSchema(meshTopicMessage); +export const insertMeshTopicMessageSchema = + createInsertSchema(meshTopicMessage); +export type SelectMeshTopicMessage = typeof meshTopicMessage.$inferSelect; +export type InsertMeshTopicMessage = typeof meshTopicMessage.$inferInsert;