feat(broker+cli): topics — conversation scope within a mesh (v0.2.0)
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Adds the third axis of mesh organization: mesh = trust boundary,
group = identity tag, topic = conversation scope. Topic-tagged
messages filter delivery by topic_member rows and persist to a
topic_message history table for back-scroll on reconnect.

Schema (additive):
- mesh.topic, mesh.topic_member, mesh.topic_message tables
- topic_visibility (public|private|dm) and topic_member_role
  (lead|member|observer) enums
- migration 0022_topics.sql, hand-written following project convention
  (drizzle journal has been drifting since 0011)

Broker:
- 10 helpers (createTopic, listTopics, findTopicByName, joinTopic,
  leaveTopic, topicMembers, getMemberTopicIds, appendTopicMessage,
  topicHistory, markTopicRead)
- drainForMember matches "#<topicId>" target_specs via member's
  topic memberships
- 7 WS handlers (topic_create/list/join/leave/members/history/mark_read)
  + resolveTopicId helper accepting id-or-name
- handleSend auto-persists topic-tagged messages to history

CLI:
- claudemesh topic create/list/join/leave/members/history/read
- claudemesh send "#deploys" "..." resolves topic name to id
- bundled skill teaches Claude the DM/group/topic decision matrix
- policy-classify recognizes topic create/join/leave as writes

Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 01:53:42 +01:00
parent b4f457fceb
commit 1afae7a507
12 changed files with 1741 additions and 196 deletions

View File

@@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws";
import { and, eq, inArray, isNull, lt, sql } from "drizzle-orm";
import { env } from "./env";
import { db } from "./db";
import { invite as inviteTable, mesh, meshMember, messageQueue, presence, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
import { invite as inviteTable, mesh, meshMember, messageQueue, presence, scheduledMessage as scheduledMessageTable, meshWebhook, peerState, meshTopic } from "@turbostarter/db/schema/mesh";
import { user } from "@turbostarter/db/schema/auth";
import { handleCliSync, type CliSyncRequest } from "./cli-sync";
import { generateId } from "@turbostarter/shared/utils";
@@ -84,6 +84,15 @@ import {
listDbMeshServices,
deleteService,
getRunningServices,
createTopic,
listTopics,
findTopicByName,
joinTopic,
leaveTopic,
topicMembers,
topicHistory,
markTopicRead,
appendTopicMessage,
} from "./broker";
import * as serviceManager from "./service-manager";
import { ensureBucket, meshBucketName, minioClient } from "./minio";
@@ -1495,6 +1504,26 @@ function sendError(
}
}
/**
* Resolve a topic identifier — accepts either a topic id directly OR a
* topic name within the given mesh. Returns the topic id, or null if no
* matching topic exists. Used by every topic_* WS handler so callers can
* reference topics by human-readable name without an extra round trip.
*/
async function resolveTopicId(meshId: string, idOrName: string): Promise<string | null> {
// ULID-ish ids are 25-26 chars of base32; names are usually shorter and
// human-readable. Try as id first (cheap PK lookup), fall back to name.
if (idOrName.length >= 20 && /^[a-z0-9_-]+$/i.test(idOrName)) {
const byId = await db
.select({ id: meshTopic.id })
.from(meshTopic)
.where(and(eq(meshTopic.id, idOrName), eq(meshTopic.meshId, meshId)));
if (byId[0]) return byId[0].id;
}
const byName = await findTopicByName(meshId, idOrName);
return byName?.id ?? null;
}
// --- Peer state persistence ---
async function savePeerState(conn: PeerConn, memberId: string, meshId: string): Promise<void> {
@@ -1901,6 +1930,24 @@ async function handleSend(
nonce: msg.nonce,
ciphertext: msg.ciphertext,
});
// Topic-tagged messages (targetSpec starts with `#<topicId>`) get
// persisted to topic_message in addition to the ephemeral queue, so
// humans (and opting-in agents) can fetch history on reconnect.
// Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md
if (msg.targetSpec.startsWith("#")) {
const topicId = msg.targetSpec.slice(1);
void appendTopicMessage({
topicId,
senderMemberId: conn.memberId,
senderSessionPubkey: conn.sessionPubkey ?? undefined,
nonce: msg.nonce,
ciphertext: msg.ciphertext,
}).catch((e) =>
log.warn("appendTopicMessage failed", { topic_id: topicId, err: String(e) }),
);
}
void audit(conn.meshId, "message_sent", conn.memberId, conn.displayName, {
targetSpec: msg.targetSpec,
priority: msg.priority,
@@ -2291,6 +2338,125 @@ function handleConnection(ws: WebSocket): void {
});
break;
}
// ── Topics (v0.2.0) ─────────────────────────────────────────
case "topic_create": {
const tc = msg as Extract<WSClientMessage, { type: "topic_create" }>;
const result = await createTopic({
meshId: conn.meshId,
name: tc.name,
description: tc.description,
visibility: tc.visibility,
createdByMemberId: conn.memberId,
});
// Auto-subscribe the creator.
await joinTopic({ topicId: result.id, memberId: conn.memberId, role: "lead" });
const resp: WSServerMessage = {
type: "topic_created",
topic: {
id: result.id,
name: tc.name,
visibility: tc.visibility ?? "public",
},
created: result.created,
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
log.info("ws topic_create", { presence_id: presenceId, topic: tc.name, created: result.created });
break;
}
case "topic_list": {
const topics = await listTopics(conn.meshId);
const resp: WSServerMessage = {
type: "topic_list_response",
topics: topics.map((t) => ({
id: t.id,
name: t.name,
description: t.description,
visibility: t.visibility,
memberCount: t.memberCount,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
break;
}
case "topic_join": {
const tj = msg as Extract<WSClientMessage, { type: "topic_join" }>;
const topicId = await resolveTopicId(conn.meshId, tj.topic);
if (!topicId) { sendError(ws, "topic_not_found", `topic "${tj.topic}" not found`, _reqId); break; }
await joinTopic({ topicId, memberId: conn.memberId, role: tj.role });
log.info("ws topic_join", { presence_id: presenceId, topic: topicId });
break;
}
case "topic_leave": {
const tl = msg as Extract<WSClientMessage, { type: "topic_leave" }>;
const topicId = await resolveTopicId(conn.meshId, tl.topic);
if (!topicId) { sendError(ws, "topic_not_found", `topic "${tl.topic}" not found`, _reqId); break; }
await leaveTopic({ topicId, memberId: conn.memberId });
log.info("ws topic_leave", { presence_id: presenceId, topic: topicId });
break;
}
case "topic_members": {
const tm = msg as Extract<WSClientMessage, { type: "topic_members" }>;
const topicId = await resolveTopicId(conn.meshId, tm.topic);
if (!topicId) { sendError(ws, "topic_not_found", `topic "${tm.topic}" not found`, _reqId); break; }
const members = await topicMembers(topicId);
const resp: WSServerMessage = {
type: "topic_members_response",
topic: tm.topic,
members: members.map((m) => ({
memberId: m.memberId,
pubkey: m.pubkey,
displayName: m.displayName,
role: m.role,
joinedAt: m.joinedAt.toISOString(),
lastReadAt: m.lastReadAt?.toISOString() ?? null,
})),
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
break;
}
case "topic_history": {
const th = msg as Extract<WSClientMessage, { type: "topic_history" }>;
const topicId = await resolveTopicId(conn.meshId, th.topic);
if (!topicId) { sendError(ws, "topic_not_found", `topic "${th.topic}" not found`, _reqId); break; }
const history = await topicHistory({
topicId,
limit: th.limit,
beforeId: th.beforeId,
});
const resp: WSServerMessage = {
type: "topic_history_response",
topic: th.topic,
messages: history.map((h) => ({
id: h.id,
senderPubkey: h.senderPubkey,
nonce: h.nonce,
ciphertext: h.ciphertext,
createdAt: h.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
break;
}
case "topic_mark_read": {
const tr = msg as Extract<WSClientMessage, { type: "topic_mark_read" }>;
const topicId = await resolveTopicId(conn.meshId, tr.topic);
if (!topicId) { sendError(ws, "topic_not_found", `topic "${tr.topic}" not found`, _reqId); break; }
await markTopicRead({ topicId, memberId: conn.memberId });
break;
}
case "set_state": {
const ss = msg as Extract<WSClientMessage, { type: "set_state" }>;
// Look up the display name for attribution.