feat(web): restore payload CMS (cuidecar pattern + importMap)
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-06 14:30:16 +01:00
parent 7a5f786e0c
commit 0b4e389f2b
22 changed files with 9651 additions and 4 deletions

View File

@@ -15,11 +15,13 @@
},
"prettier": "@turbostarter/prettier-config",
"dependencies": {
"@qdrant/js-client-rest": "1.17.0",
"@turbostarter/db": "workspace:*",
"@turbostarter/shared": "workspace:*",
"drizzle-orm": "0.44.7",
"libsodium-wrappers": "0.7.15",
"minio": "8.0.7",
"neo4j-driver": "6.0.1",
"ws": "8.20.0",
"zod": "catalog:"
},

View File

@@ -34,9 +34,12 @@ import {
mesh,
meshFile,
meshFileAccess,
meshContext,
meshMember as memberTable,
meshMemory,
meshState,
meshStream,
meshTask,
messageQueue,
pendingStatus,
presence,
@@ -889,6 +892,334 @@ export async function deleteFile(
);
}
// --- Context sharing ---
/**
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair
* has at most one context row — repeated calls update it in place.
*/
export async function shareContext(
meshId: string,
presenceId: string,
peerName: string | undefined,
summary: string,
filesRead?: string[],
keyFindings?: string[],
tags?: string[],
): Promise<string> {
const now = new Date();
// Try to find existing context for this presence in this mesh.
const [existing] = await db
.select({ id: meshContext.id })
.from(meshContext)
.where(
and(
eq(meshContext.meshId, meshId),
eq(meshContext.presenceId, presenceId),
),
)
.limit(1);
if (existing) {
await db
.update(meshContext)
.set({
peerName: peerName ?? null,
summary,
filesRead: filesRead ?? [],
keyFindings: keyFindings ?? [],
tags: tags ?? [],
updatedAt: now,
})
.where(eq(meshContext.id, existing.id));
return existing.id;
}
const [row] = await db
.insert(meshContext)
.values({
meshId,
presenceId,
peerName: peerName ?? null,
summary,
filesRead: filesRead ?? [],
keyFindings: keyFindings ?? [],
tags: tags ?? [],
updatedAt: now,
})
.returning({ id: meshContext.id });
if (!row) throw new Error("failed to insert context");
return row.id;
}
/**
* Search contexts by tag match or summary ILIKE.
*/
export async function getContext(
meshId: string,
query: string,
): Promise<
Array<{
peerName: string;
summary: string;
filesRead: string[];
keyFindings: string[];
tags: string[];
updatedAt: Date;
}>
> {
const result = await db.execute<{
peer_name: string | null;
summary: string;
files_read: string[] | null;
key_findings: string[] | null;
tags: string[] | null;
updated_at: string | Date;
}>(sql`
SELECT peer_name, summary, files_read, key_findings, tags, updated_at
FROM mesh.context
WHERE mesh_id = ${meshId}
AND (
summary ILIKE ${"%" + query + "%"}
OR ${query} = ANY(tags)
)
ORDER BY updated_at DESC
LIMIT 20
`);
const rows = (result.rows ?? result) as Array<{
peer_name: string | null;
summary: string;
files_read: string[] | null;
key_findings: string[] | null;
tags: string[] | null;
updated_at: string | Date;
}>;
return rows.map((r) => ({
peerName: r.peer_name ?? "unknown",
summary: r.summary,
filesRead: r.files_read ?? [],
keyFindings: r.key_findings ?? [],
tags: r.tags ?? [],
updatedAt:
r.updated_at instanceof Date ? r.updated_at : new Date(r.updated_at),
}));
}
/**
* List all contexts for a mesh, ordered by most recently updated.
*/
export async function listContexts(
meshId: string,
): Promise<
Array<{
peerName: string;
summary: string;
tags: string[];
updatedAt: Date;
}>
> {
const rows = await db
.select({
peerName: meshContext.peerName,
summary: meshContext.summary,
tags: meshContext.tags,
updatedAt: meshContext.updatedAt,
})
.from(meshContext)
.where(eq(meshContext.meshId, meshId))
.orderBy(desc(meshContext.updatedAt));
return rows.map((r) => ({
peerName: r.peerName ?? "unknown",
summary: r.summary,
tags: (r.tags ?? []) as string[],
updatedAt: r.updatedAt,
}));
}
// --- Tasks ---
/**
* Create a new task in a mesh. Returns the generated id.
*/
export async function createTask(
meshId: string,
title: string,
assignee?: string,
priority?: string,
tags?: string[],
createdByName?: string,
): Promise<string> {
const [row] = await db
.insert(meshTask)
.values({
meshId,
title,
assignee: assignee ?? null,
priority: priority ?? "normal",
status: "open",
tags: tags ?? [],
createdByName: createdByName ?? null,
})
.returning({ id: meshTask.id });
if (!row) throw new Error("failed to insert task");
return row.id;
}
/**
* Claim an open task. Sets status to 'claimed' and records who claimed it.
* Only succeeds if the task is currently 'open'.
*/
export async function claimTask(
meshId: string,
taskId: string,
presenceId: string,
peerName?: string,
): Promise<boolean> {
const now = new Date();
const result = await db
.update(meshTask)
.set({
status: "claimed",
claimedByPresence: presenceId,
claimedByName: peerName ?? null,
claimedAt: now,
})
.where(
and(
eq(meshTask.id, taskId),
eq(meshTask.meshId, meshId),
eq(meshTask.status, "open"),
),
)
.returning({ id: meshTask.id });
return result.length > 0;
}
/**
* Complete a task. Sets status to 'done', records the result and timestamp.
*/
export async function completeTask(
meshId: string,
taskId: string,
result?: string,
): Promise<boolean> {
const now = new Date();
const rows = await db
.update(meshTask)
.set({
status: "done",
result: result ?? null,
completedAt: now,
})
.where(
and(
eq(meshTask.id, taskId),
eq(meshTask.meshId, meshId),
),
)
.returning({ id: meshTask.id });
return rows.length > 0;
}
/**
* List tasks in a mesh with optional status and assignee filters.
*/
export async function listTasks(
meshId: string,
status?: string,
assignee?: string,
): Promise<
Array<{
id: string;
title: string;
assignee: string | null;
claimedBy: string | null;
status: string;
priority: string;
createdBy: string | null;
tags: string[];
createdAt: Date;
}>
> {
const conditions = [eq(meshTask.meshId, meshId)];
if (status) {
conditions.push(eq(meshTask.status, status));
}
if (assignee) {
conditions.push(eq(meshTask.assignee, assignee));
}
const rows = await db
.select({
id: meshTask.id,
title: meshTask.title,
assignee: meshTask.assignee,
claimedByName: meshTask.claimedByName,
status: meshTask.status,
priority: meshTask.priority,
createdByName: meshTask.createdByName,
tags: meshTask.tags,
createdAt: meshTask.createdAt,
})
.from(meshTask)
.where(and(...conditions))
.orderBy(desc(meshTask.createdAt))
.limit(100);
return rows.map((r) => ({
id: r.id,
title: r.title,
assignee: r.assignee,
claimedBy: r.claimedByName,
status: r.status,
priority: r.priority,
createdBy: r.createdByName,
tags: (r.tags ?? []) as string[],
createdAt: r.createdAt,
}));
}
// --- Streams ---
/**
* Create a named real-time stream in a mesh. Upsert semantics: if a
* stream with the same (meshId, name) already exists, return its id.
*/
export async function createStream(
meshId: string,
name: string,
createdByName: string,
): Promise<string> {
const existing = await db
.select({ id: meshStream.id })
.from(meshStream)
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
if (existing.length > 0) return existing[0]!.id;
const [row] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.returning({ id: meshStream.id });
return row!.id;
}
/**
* List all streams in a mesh, ordered by creation time.
*/
export async function listStreams(
meshId: string,
): Promise<
Array<{ id: string; name: string; createdBy: string | null; createdAt: Date }>
> {
return db
.select({
id: meshStream.id,
name: meshStream.name,
createdBy: meshStream.createdByName,
createdAt: meshStream.createdAt,
})
.from(meshStream)
.where(eq(meshStream.meshId, meshId))
.orderBy(asc(meshStream.createdAt));
}
// --- Message queueing + delivery ---
export interface QueueParams {
@@ -1239,3 +1570,118 @@ export async function findMemberByPubkey(
.limit(1);
return row ?? null;
}
// --- Mesh databases (per-mesh PostgreSQL schemas) ---
function meshSchemaName(meshId: string): string {
return `meshdb_${meshId.toLowerCase().replace(/[^a-z0-9]/g, "_")}`;
}
/** Validate that user-provided SQL doesn't contain dangerous operations. */
function validateMeshSql(userSql: string): void {
const upper = userSql.toUpperCase();
const forbidden = [
"DROP SCHEMA",
"CREATE SCHEMA",
"SET SEARCH_PATH",
"SET ROLE",
"SET SESSION",
"SET LOCAL",
"GRANT",
"REVOKE",
];
for (const f of forbidden) {
if (upper.includes(f))
throw new Error(`Forbidden SQL operation: ${f}`);
}
}
/** Ensure the per-mesh schema exists. */
export async function ensureMeshSchema(meshId: string): Promise<string> {
const schema = meshSchemaName(meshId);
await db.execute(
sql`CREATE SCHEMA IF NOT EXISTS ${sql.raw('"' + schema + '"')}`,
);
return schema;
}
/** Run a SELECT query in the mesh's schema. */
export async function meshQuery(
meshId: string,
query: string,
): Promise<{
columns: string[];
rows: Array<Record<string, unknown>>;
rowCount: number;
}> {
validateMeshSql(query);
const schema = await ensureMeshSchema(meshId);
// Use a transaction so SET LOCAL is scoped and automatically reset.
return await db.transaction(async (tx) => {
await tx.execute(
sql.raw(`SET LOCAL search_path TO "${schema}"`)
);
const result = await tx.execute(sql.raw(query));
const rows = (result.rows ?? []) as Array<Record<string, unknown>>;
const columns = rows.length > 0 ? Object.keys(rows[0]!) : [];
return { columns, rows, rowCount: rows.length };
});
}
/** Run a DDL/DML statement in the mesh's schema. */
export async function meshExecute(
meshId: string,
statement: string,
): Promise<{ rowCount: number }> {
validateMeshSql(statement);
const schema = await ensureMeshSchema(meshId);
return await db.transaction(async (tx) => {
await tx.execute(
sql.raw(`SET LOCAL search_path TO "${schema}"`)
);
const result = await tx.execute(sql.raw(statement));
return { rowCount: (result as any).rowCount ?? 0 };
});
}
/** List tables and columns in the mesh's schema. */
export async function meshSchema(
meshId: string,
): Promise<
Array<{
name: string;
columns: Array<{ name: string; type: string; nullable: boolean }>;
}>
> {
const schema = meshSchemaName(meshId);
const result = await db.execute<{
table_name: string;
column_name: string;
data_type: string;
is_nullable: string;
}>(sql`
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = ${schema}
ORDER BY table_name, ordinal_position
`);
const rows = (result.rows ?? result) as Array<{
table_name: string;
column_name: string;
data_type: string;
is_nullable: string;
}>;
const tables = new Map<
string,
Array<{ name: string; type: string; nullable: boolean }>
>();
for (const r of rows) {
if (!tables.has(r.table_name)) tables.set(r.table_name, []);
tables.get(r.table_name)!.push({
name: r.column_name,
type: r.data_type,
nullable: r.is_nullable === "YES",
});
}
return [...tables.entries()].map(([name, columns]) => ({ name, columns }));
}

View File

@@ -24,6 +24,10 @@ const envSchema = z.object({
MINIO_ACCESS_KEY: z.string().default("claudemesh"),
MINIO_SECRET_KEY: z.string().default("changeme"),
MINIO_USE_SSL: z.coerce.boolean().default(false),
QDRANT_URL: z.string().default("http://qdrant:6333"),
NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"),
NEO4J_PASSWORD: z.string().default("changeme"),
NODE_ENV: z
.enum(["development", "production", "test"])
.default("development"),

View File

@@ -15,17 +15,21 @@
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
import type { Duplex } from "node:stream";
import { WebSocketServer, type WebSocket } from "ws";
import { eq } from "drizzle-orm";
import { eq, sql } from "drizzle-orm";
import { env } from "./env";
import { db } from "./db";
import { messageQueue } from "@turbostarter/db/schema/mesh";
import {
claimTask,
completeTask,
connectPresence,
createTask,
deleteFile,
disconnectPresence,
drainForMember,
findMemberByPubkey,
forgetMemory,
getContext,
getFile,
getFileStatus,
getState,
@@ -34,9 +38,11 @@ import {
joinGroup,
joinMesh,
leaveGroup,
listContexts,
listFiles,
listPeersInMesh,
listState,
listTasks,
queueMessage,
recallMemory,
recordFileAccess,
@@ -45,12 +51,21 @@ import {
rememberMemory,
setSummary,
setState,
shareContext,
startSweepers,
stopSweepers,
uploadFile,
writeStatus,
ensureMeshSchema,
meshQuery,
meshExecute,
meshSchema,
createStream,
listStreams,
} from "./broker";
import { ensureBucket, meshBucketName, minioClient } from "./minio";
import { qdrant, meshCollectionName, ensureCollection } from "./qdrant";
import { neo4jDriver, meshDbName, ensureDatabase } from "./neo4j-client";
import type {
HookSetStatusRequest,
WSClientMessage,
@@ -81,6 +96,9 @@ interface PeerConn {
const connections = new Map<string, PeerConn>();
const connectionsPerMesh = new Map<string, number>();
// Stream subscriptions: "meshId:streamName" → Set of presenceIds
const streamSubscriptions = new Map<string, Set<string>>();
const hookRateLimit = new TokenBucket(
env.HOOK_RATE_LIMIT_PER_MIN,
env.HOOK_RATE_LIMIT_PER_MIN,
@@ -1066,6 +1084,598 @@ function handleConnection(ws: WebSocket): void {
});
break;
}
case "share_context": {
const sc = msg as Extract<WSClientMessage, { type: "share_context" }>;
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
const ctxId = await shareContext(
conn.meshId,
presenceId,
memberInfo?.displayName,
sc.summary,
sc.filesRead,
sc.keyFindings,
sc.tags,
);
sendToPeer(presenceId, {
type: "context_shared",
id: ctxId,
});
// Notify all other peers in the mesh that context was shared.
for (const [pid, peer] of connections) {
if (pid === presenceId) continue;
if (peer.meshId !== conn.meshId) continue;
sendToPeer(pid, {
type: "state_change",
key: `_context:${memberInfo?.displayName ?? "unknown"}`,
value: sc.summary,
updatedBy: memberInfo?.displayName ?? "unknown",
});
}
log.info("ws share_context", {
presence_id: presenceId,
context_id: ctxId,
});
break;
}
case "get_context": {
const gc = msg as Extract<WSClientMessage, { type: "get_context" }>;
const contexts = await getContext(conn.meshId, gc.query);
sendToPeer(presenceId, {
type: "context_results",
contexts: contexts.map((c) => ({
peerName: c.peerName,
summary: c.summary,
filesRead: c.filesRead,
keyFindings: c.keyFindings,
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
});
log.info("ws get_context", {
presence_id: presenceId,
query: gc.query.slice(0, 80),
results: contexts.length,
});
break;
}
case "list_contexts": {
const allContexts = await listContexts(conn.meshId);
sendToPeer(presenceId, {
type: "context_list",
contexts: allContexts.map((c) => ({
peerName: c.peerName,
summary: c.summary,
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
});
log.info("ws list_contexts", {
presence_id: presenceId,
mesh_id: conn.meshId,
count: allContexts.length,
});
break;
}
case "create_task": {
const ct = msg as Extract<WSClientMessage, { type: "create_task" }>;
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
const taskId = await createTask(
conn.meshId,
ct.title,
ct.assignee,
ct.priority,
ct.tags,
memberInfo?.displayName,
);
sendToPeer(presenceId, {
type: "task_created",
id: taskId,
});
log.info("ws create_task", {
presence_id: presenceId,
task_id: taskId,
title: ct.title.slice(0, 80),
});
break;
}
case "claim_task": {
const clm = msg as Extract<WSClientMessage, { type: "claim_task" }>;
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
const claimed = await claimTask(
conn.meshId,
clm.taskId,
presenceId,
memberInfo?.displayName,
);
if (!claimed) {
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist");
break;
}
// Return updated task list so caller sees the change.
const tasksAfterClaim = await listTasks(conn.meshId);
sendToPeer(presenceId, {
type: "task_list",
tasks: tasksAfterClaim.map((t) => ({
id: t.id,
title: t.title,
assignee: t.assignee,
claimedBy: t.claimedBy,
status: t.status,
priority: t.priority,
createdBy: t.createdBy,
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
});
log.info("ws claim_task", {
presence_id: presenceId,
task_id: clm.taskId,
});
break;
}
case "complete_task": {
const cpt = msg as Extract<WSClientMessage, { type: "complete_task" }>;
const completed = await completeTask(
conn.meshId,
cpt.taskId,
cpt.result,
);
if (!completed) {
sendError(conn.ws, "task_not_found", "task not found in this mesh");
break;
}
// Return updated task list.
const tasksAfterComplete = await listTasks(conn.meshId);
sendToPeer(presenceId, {
type: "task_list",
tasks: tasksAfterComplete.map((t) => ({
id: t.id,
title: t.title,
assignee: t.assignee,
claimedBy: t.claimedBy,
status: t.status,
priority: t.priority,
createdBy: t.createdBy,
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
});
log.info("ws complete_task", {
presence_id: presenceId,
task_id: cpt.taskId,
});
break;
}
case "list_tasks": {
const lt = msg as Extract<WSClientMessage, { type: "list_tasks" }>;
const tasks = await listTasks(conn.meshId, lt.status, lt.assignee);
sendToPeer(presenceId, {
type: "task_list",
tasks: tasks.map((t) => ({
id: t.id,
title: t.title,
assignee: t.assignee,
claimedBy: t.claimedBy,
status: t.status,
priority: t.priority,
createdBy: t.createdBy,
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
});
log.info("ws list_tasks", {
presence_id: presenceId,
mesh_id: conn.meshId,
count: tasks.length,
});
break;
}
// --- Streams ---
case "create_stream": {
const cs = msg as Extract<WSClientMessage, { type: "create_stream" }>;
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
const streamId = await createStream(
conn.meshId,
cs.name,
memberInfo?.displayName ?? "peer",
);
sendToPeer(presenceId, {
type: "stream_created",
id: streamId,
name: cs.name,
});
log.info("ws create_stream", {
presence_id: presenceId,
stream: cs.name,
});
break;
}
case "subscribe": {
const sub = msg as Extract<WSClientMessage, { type: "subscribe" }>;
const key = `${conn.meshId}:${sub.stream}`;
if (!streamSubscriptions.has(key))
streamSubscriptions.set(key, new Set());
streamSubscriptions.get(key)!.add(presenceId);
log.info("ws subscribe", {
presence_id: presenceId,
stream: sub.stream,
});
break;
}
case "unsubscribe": {
const unsub = msg as Extract<
WSClientMessage,
{ type: "unsubscribe" }
>;
const key = `${conn.meshId}:${unsub.stream}`;
streamSubscriptions.get(key)?.delete(presenceId);
log.info("ws unsubscribe", {
presence_id: presenceId,
stream: unsub.stream,
});
break;
}
case "publish": {
const pub = msg as Extract<WSClientMessage, { type: "publish" }>;
const key = `${conn.meshId}:${pub.stream}`;
const subs = streamSubscriptions.get(key);
if (subs) {
const memberInfo = conn.memberPubkey
? await findMemberByPubkey(conn.meshId, conn.memberPubkey)
: null;
const push: WSServerMessage = {
type: "stream_data",
stream: pub.stream,
data: pub.data,
publishedBy: memberInfo?.displayName ?? "peer",
};
for (const subPid of subs) {
if (subPid === presenceId) continue; // don't echo to publisher
sendToPeer(subPid, push);
}
}
metrics.messagesRoutedTotal.inc({ priority: "stream" });
break;
}
case "list_streams": {
const streams = await listStreams(conn.meshId);
sendToPeer(presenceId, {
type: "stream_list",
streams: streams.map((s) => {
const key = `${conn.meshId}:${s.name}`;
return {
id: s.id,
name: s.name,
createdBy: s.createdBy ?? "",
createdAt: s.createdAt.toISOString(),
subscriberCount: streamSubscriptions.get(key)?.size ?? 0,
};
}),
});
log.info("ws list_streams", {
presence_id: presenceId,
mesh_id: conn.meshId,
count: streams.length,
});
break;
}
// --- Vector storage ---
case "vector_store": {
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
const collName = meshCollectionName(conn.meshId, vs.collection);
await ensureCollection(collName);
const { generateId } = await import("@turbostarter/shared/utils");
const pointId = generateId();
// Store text + metadata as payload. Use a zero vector as placeholder
// — real embeddings should be computed client-side and sent directly
// to Qdrant in a future version.
const zeroVector = new Array(1536).fill(0) as number[];
await qdrant.upsert(collName, {
wait: true,
points: [
{
id: pointId,
vector: zeroVector,
payload: {
text: vs.text,
mesh_id: conn.meshId,
stored_by: conn.memberPubkey,
stored_at: new Date().toISOString(),
...(vs.metadata ?? {}),
},
},
],
});
sendToPeer(presenceId, {
type: "ack" as const,
id: pointId,
messageId: pointId,
queued: false,
});
log.info("ws vector_store", {
presence_id: presenceId,
collection: vs.collection,
point_id: pointId,
});
break;
}
case "vector_search": {
const vq = msg as Extract<WSClientMessage, { type: "vector_search" }>;
const searchCollName = meshCollectionName(conn.meshId, vq.collection);
const searchLimit = vq.limit ?? 10;
try {
// Keyword search via payload scroll + filter.
// Full vector similarity requires client-computed embeddings (future).
const queryLower = vq.query.toLowerCase();
const scrollResult = await qdrant.scroll(searchCollName, {
limit: 100,
with_payload: true,
with_vector: false,
});
const matches = (scrollResult.points ?? [])
.filter((p) => {
const text = (p.payload as Record<string, unknown>)?.text;
return typeof text === "string" && text.toLowerCase().includes(queryLower);
})
.slice(0, searchLimit)
.map((p) => {
const payload = p.payload as Record<string, unknown>;
return {
id: String(p.id),
text: (payload.text as string) ?? "",
score: 1.0, // keyword match — no vector similarity score
metadata: payload,
};
});
sendToPeer(presenceId, {
type: "vector_results",
results: matches,
});
} catch {
// Collection may not exist yet — return empty results.
sendToPeer(presenceId, {
type: "vector_results",
results: [],
});
}
log.info("ws vector_search", {
presence_id: presenceId,
collection: vq.collection,
query: vq.query.slice(0, 80),
});
break;
}
case "vector_delete": {
const vd = msg as Extract<WSClientMessage, { type: "vector_delete" }>;
const deleteCollName = meshCollectionName(conn.meshId, vd.collection);
try {
await qdrant.delete(deleteCollName, {
wait: true,
points: [vd.id],
});
} catch {
/* collection or point may not exist — idempotent */
}
sendToPeer(presenceId, {
type: "ack" as const,
id: vd.id,
messageId: vd.id,
queued: false,
});
log.info("ws vector_delete", {
presence_id: presenceId,
collection: vd.collection,
point_id: vd.id,
});
break;
}
case "list_collections": {
try {
const qdrantResponse = await qdrant.getCollections();
const prefix = `mesh_${conn.meshId}_`.toLowerCase().replace(/[^a-z0-9_]/g, "_");
const meshCollections = (qdrantResponse.collections ?? [])
.map((c) => c.name)
.filter((name) => name.startsWith(prefix))
.map((name) => name.slice(prefix.length));
sendToPeer(presenceId, {
type: "collection_list",
collections: meshCollections,
});
} catch {
sendToPeer(presenceId, {
type: "collection_list",
collections: [],
});
}
log.info("ws list_collections", {
presence_id: presenceId,
mesh_id: conn.meshId,
});
break;
}
// --- Graph database ---
case "graph_query": {
const gq = msg as Extract<WSClientMessage, { type: "graph_query" }>;
const gqDbName = meshDbName(conn.meshId);
let gqSession;
try {
await ensureDatabase(gqDbName);
gqSession = neo4jDriver.session({ database: gqDbName });
} catch {
// Community edition — fall back to default db.
gqSession = neo4jDriver.session();
}
try {
const gqResult = await gqSession.run(gq.cypher);
const gqRecords = gqResult.records.map((r) => {
const obj: Record<string, unknown> = {};
for (const key of r.keys) {
obj[key] = r.get(key);
}
return obj;
});
sendToPeer(presenceId, {
type: "graph_result",
records: gqRecords,
});
} catch (gqErr) {
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr));
} finally {
await gqSession.close();
}
log.info("ws graph_query", {
presence_id: presenceId,
cypher: gq.cypher.slice(0, 80),
});
break;
}
case "graph_execute": {
const ge = msg as Extract<WSClientMessage, { type: "graph_execute" }>;
const geDbName = meshDbName(conn.meshId);
let geSession;
try {
await ensureDatabase(geDbName);
geSession = neo4jDriver.session({ database: geDbName });
} catch {
geSession = neo4jDriver.session();
}
try {
const geResult = await geSession.run(ge.cypher);
const geRecords = geResult.records.map((r) => {
const obj: Record<string, unknown> = {};
for (const key of r.keys) {
obj[key] = r.get(key);
}
return obj;
});
sendToPeer(presenceId, {
type: "graph_result",
records: geRecords,
});
} catch (geErr) {
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr));
} finally {
await geSession.close();
}
log.info("ws graph_execute", {
presence_id: presenceId,
cypher: ge.cypher.slice(0, 80),
});
break;
}
// --- Mesh database (per-mesh PostgreSQL schema) ---
case "mesh_query": {
const mq = msg as Extract<WSClientMessage, { type: "mesh_query" }>;
try {
const result = await meshQuery(conn.meshId, mq.sql);
sendToPeer(presenceId, { type: "mesh_query_result", ...result });
} catch (e) {
sendError(
conn.ws,
"query_error",
e instanceof Error ? e.message : String(e),
);
}
log.info("ws mesh_query", {
presence_id: presenceId,
sql: mq.sql.slice(0, 80),
});
break;
}
case "mesh_execute": {
const me = msg as Extract<WSClientMessage, { type: "mesh_execute" }>;
try {
const result = await meshExecute(conn.meshId, me.sql);
sendToPeer(presenceId, {
type: "mesh_query_result",
columns: [],
rows: [],
rowCount: result.rowCount,
});
} catch (e) {
sendError(
conn.ws,
"execute_error",
e instanceof Error ? e.message : String(e),
);
}
log.info("ws mesh_execute", {
presence_id: presenceId,
sql: me.sql.slice(0, 80),
});
break;
}
case "mesh_schema": {
try {
const tables = await meshSchema(conn.meshId);
sendToPeer(presenceId, { type: "mesh_schema_result", tables });
} catch (e) {
sendError(
conn.ws,
"schema_error",
e instanceof Error ? e.message : String(e),
);
}
log.info("ws mesh_schema", { presence_id: presenceId });
break;
}
case "mesh_info": {
const [peers, stateEntries, memCount, fileCount, taskCounts, streams, tables] = await Promise.all([
listPeersInMesh(conn.meshId),
listState(conn.meshId),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.memory WHERE mesh_id = ${conn.meshId} AND forgotten_at IS NULL`).then(r => Number(((r.rows ?? r) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.file WHERE mesh_id = ${conn.meshId} AND deleted_at IS NULL`).then(r => Number(((r.rows ?? r) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT status, COUNT(*) as n FROM mesh.task WHERE mesh_id = ${conn.meshId} GROUP BY status`).then(r => {
const rows = (r.rows ?? r) as Array<{ status: string; n: string }>;
const counts = { open: 0, claimed: 0, done: 0 };
for (const row of rows) counts[row.status as keyof typeof counts] = Number(row.n);
return counts;
}),
listStreams(conn.meshId),
meshSchema(conn.meshId).catch(() => []),
]);
const allGroups = new Set<string>();
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
const peerConn = connections.get(presenceId);
sendToPeer(presenceId, {
type: "mesh_info_result",
mesh: conn.meshId,
peers: peers.length,
groups: [...allGroups],
stateKeys: stateEntries.map((e: any) => e.key),
memoryCount: memCount,
fileCount: fileCount,
tasks: taskCounts,
streams: streams.map(s => s.name),
tables: tables.map((t: any) => t.name),
collections: [],
yourName: peerConn?.groups?.[0]?.name ?? "unknown",
yourGroups: peerConn?.groups ?? [],
});
log.info("ws mesh_info", { presence_id: presenceId });
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
@@ -1081,6 +1691,11 @@ function handleConnection(ws: WebSocket): void {
connections.delete(presenceId);
if (conn) decMeshCount(conn.meshId);
await disconnectPresence(presenceId);
// Clean up stream subscriptions for this peer
for (const [key, subs] of streamSubscriptions) {
subs.delete(presenceId);
if (subs.size === 0) streamSubscriptions.delete(key);
}
log.info("ws close", { presence_id: presenceId });
}
});

View File

@@ -0,0 +1,22 @@
import neo4j from "neo4j-driver";
import { env } from "./env";
export const neo4jDriver = neo4j.driver(
env.NEO4J_URL,
neo4j.auth.basic(env.NEO4J_USER, env.NEO4J_PASSWORD),
);
export function meshDbName(meshId: string): string {
return `mesh_${meshId.toLowerCase().replace(/[^a-z0-9]/g, "")}`;
}
export async function ensureDatabase(name: string): Promise<void> {
const session = neo4jDriver.session({ database: "system" });
try {
await session.run(`CREATE DATABASE $name IF NOT EXISTS`, { name });
} catch {
/* may not support multi-db in community edition — fall back to default */
} finally {
await session.close();
}
}

24
apps/broker/src/qdrant.ts Normal file
View File

@@ -0,0 +1,24 @@
import { QdrantClient } from "@qdrant/js-client-rest";
import { env } from "./env";
export const qdrant = new QdrantClient({ url: env.QDRANT_URL });
export function meshCollectionName(
meshId: string,
collection: string,
): string {
return `mesh_${meshId}_${collection}`.toLowerCase().replace(/[^a-z0-9_]/g, "_");
}
export async function ensureCollection(
name: string,
vectorSize = 1536,
): Promise<void> {
try {
await qdrant.getCollection(name);
} catch {
await qdrant.createCollection(name, {
vectors: { size: vectorSize, distance: "Cosine" },
});
}
}

View File

@@ -230,6 +230,133 @@ export interface WSMemoryResultsMessage {
}>;
}
// --- Vector storage messages ---
/** Client → broker: store a text document in a vector collection. */
export interface WSVectorStoreMessage {
type: "vector_store";
collection: string;
text: string;
metadata?: Record<string, unknown>;
}
/** Client → broker: search a vector collection. */
export interface WSVectorSearchMessage {
type: "vector_search";
collection: string;
query: string;
limit?: number;
}
/** Client → broker: delete a point from a vector collection. */
export interface WSVectorDeleteMessage {
type: "vector_delete";
collection: string;
id: string;
}
/** Client → broker: list all vector collections for this mesh. */
export interface WSListCollectionsMessage {
type: "list_collections";
}
// --- Graph database messages ---
/** Client → broker: run a read-only Cypher query. */
export interface WSGraphQueryMessage {
type: "graph_query";
cypher: string;
}
/** Client → broker: run a write Cypher statement. */
export interface WSGraphExecuteMessage {
type: "graph_execute";
cypher: string;
}
// --- Mesh database (per-mesh PostgreSQL schema) messages ---
/** Client → broker: run a SELECT query in the mesh's schema. */
export interface WSMeshQueryMessage {
type: "mesh_query";
sql: string;
}
/** Client → broker: run a DDL/DML statement in the mesh's schema. */
export interface WSMeshExecuteMessage {
type: "mesh_execute";
sql: string;
}
/** Client → broker: list tables and columns in the mesh's schema. */
export interface WSMeshSchemaMessage {
type: "mesh_schema";
}
// --- Vector/Graph response messages ---
/** Broker → client: vector search results. */
export interface WSVectorResultsMessage {
type: "vector_results";
results: Array<{
id: string;
text: string;
score: number;
metadata?: Record<string, unknown>;
}>;
}
/** Broker → client: list of vector collections. */
export interface WSCollectionListMessage {
type: "collection_list";
collections: string[];
}
/** Broker → client: graph query results. */
export interface WSGraphResultMessage {
type: "graph_result";
records: Array<Record<string, unknown>>;
}
/** Broker → client: mesh SQL query results. */
export interface WSMeshQueryResultMessage {
type: "mesh_query_result";
columns: string[];
rows: Array<Record<string, unknown>>;
rowCount: number;
}
/** Broker → client: mesh schema introspection results. */
export interface WSMeshSchemaResultMessage {
type: "mesh_schema_result";
tables: Array<{
name: string;
columns: Array<{ name: string; type: string; nullable: boolean }>;
}>;
}
/** Client → broker: get full mesh overview. */
export interface WSMeshInfoMessage {
type: "mesh_info";
}
/** Broker → client: aggregated mesh overview. */
export interface WSMeshInfoResultMessage {
type: "mesh_info_result";
mesh: string;
peers: number;
groups: string[];
stateKeys: string[];
memoryCount: number;
fileCount: number;
tasks: { open: number; claimed: number; done: number };
streams: string[];
tables: string[];
collections: string[];
yourName: string;
yourGroups: Array<{ name: string; role?: string }>;
}
/** Client → broker: check delivery status of a message. */
export interface WSMessageStatusMessage {
type: "message_status";
@@ -309,6 +436,170 @@ export interface WSFileStatusResultMessage {
}>;
}
// --- Context sharing messages ---
/** Client → broker: share current working context. */
export interface WSShareContextMessage {
type: "share_context";
summary: string;
filesRead?: string[];
keyFindings?: string[];
tags?: string[];
}
/** Client → broker: search contexts by query. */
export interface WSGetContextMessage {
type: "get_context";
query: string;
}
/** Client → broker: list all contexts in the mesh. */
export interface WSListContextsMessage {
type: "list_contexts";
}
/** Broker → client: acknowledgement for share_context. */
export interface WSContextSharedMessage {
type: "context_shared";
id: string;
}
/** Broker → client: response to get_context. */
export interface WSContextResultsMessage {
type: "context_results";
contexts: Array<{
peerName: string;
summary: string;
filesRead: string[];
keyFindings: string[];
tags: string[];
updatedAt: string;
}>;
}
/** Broker → client: response to list_contexts. */
export interface WSContextListMessage {
type: "context_list";
contexts: Array<{
peerName: string;
summary: string;
tags: string[];
updatedAt: string;
}>;
}
// --- Task messages ---
/** Client → broker: create a task. */
export interface WSCreateTaskMessage {
type: "create_task";
title: string;
assignee?: string;
priority?: string;
tags?: string[];
}
/** Client → broker: claim an open task. */
export interface WSClaimTaskMessage {
type: "claim_task";
taskId: string;
}
/** Client → broker: mark a task as done. */
export interface WSCompleteTaskMessage {
type: "complete_task";
taskId: string;
result?: string;
}
/** Client → broker: list tasks with optional filters. */
export interface WSListTasksMessage {
type: "list_tasks";
status?: string;
assignee?: string;
}
/** Broker → client: acknowledgement for create_task. */
export interface WSTaskCreatedMessage {
type: "task_created";
id: string;
}
/** Broker → client: response to list_tasks, claim_task, complete_task. */
export interface WSTaskListMessage {
type: "task_list";
tasks: Array<{
id: string;
title: string;
assignee: string | null;
claimedBy: string | null;
status: string;
priority: string;
createdBy: string | null;
tags: string[];
createdAt: string;
}>;
}
// --- Stream messages ---
/** Client → broker: create a named real-time stream. */
export interface WSCreateStreamMessage {
type: "create_stream";
name: string;
}
/** Client → broker: publish data to a stream. */
export interface WSPublishMessage {
type: "publish";
stream: string;
data: unknown;
}
/** Client → broker: subscribe to a stream. */
export interface WSSubscribeMessage {
type: "subscribe";
stream: string;
}
/** Client → broker: unsubscribe from a stream. */
export interface WSUnsubscribeMessage {
type: "unsubscribe";
stream: string;
}
/** Client → broker: list all streams in the mesh. */
export interface WSListStreamsMessage {
type: "list_streams";
}
/** Broker → client: acknowledgement for create_stream. */
export interface WSStreamCreatedMessage {
type: "stream_created";
id: string;
name: string;
}
/** Broker → client: real-time data pushed from a stream. */
export interface WSStreamDataMessage {
type: "stream_data";
stream: string;
data: unknown;
publishedBy: string;
}
/** Broker → client: response to list_streams. */
export interface WSStreamListMessage {
type: "stream_list";
streams: Array<{
id: string;
name: string;
createdBy: string;
createdAt: string;
subscriberCount: number;
}>;
}
/** Broker → client: structured error. */
export interface WSErrorMessage {
type: "error";
@@ -335,7 +626,29 @@ export type WSClientMessage =
| WSGetFileMessage
| WSListFilesMessage
| WSFileStatusMessage
| WSDeleteFileMessage;
| WSDeleteFileMessage
| WSShareContextMessage
| WSGetContextMessage
| WSListContextsMessage
| WSCreateTaskMessage
| WSClaimTaskMessage
| WSCompleteTaskMessage
| WSListTasksMessage
| WSVectorStoreMessage
| WSVectorSearchMessage
| WSVectorDeleteMessage
| WSListCollectionsMessage
| WSGraphQueryMessage
| WSGraphExecuteMessage
| WSMeshQueryMessage
| WSMeshExecuteMessage
| WSMeshSchemaMessage
| WSCreateStreamMessage
| WSPublishMessage
| WSSubscribeMessage
| WSUnsubscribeMessage
| WSListStreamsMessage
| WSMeshInfoMessage;
export type WSServerMessage =
| WSHelloAckMessage
@@ -351,4 +664,18 @@ export type WSServerMessage =
| WSFileUrlMessage
| WSFileListMessage
| WSFileStatusResultMessage
| WSContextSharedMessage
| WSContextResultsMessage
| WSContextListMessage
| WSTaskCreatedMessage
| WSTaskListMessage
| WSVectorResultsMessage
| WSCollectionListMessage
| WSGraphResultMessage
| WSMeshQueryResultMessage
| WSMeshSchemaResultMessage
| WSStreamCreatedMessage
| WSStreamDataMessage
| WSStreamListMessage
| WSMeshInfoResultMessage
| WSErrorMessage;