21 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
59848f0d3e chore(cli): v0.6.6 — correlation ID refactor (resolver Maps + _reqId)
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
2026-04-07 14:31:29 +01:00
Alejandro Gutiérrez
d0fa1c028f fix(broker): echo _reqId in all WS responses for correlation ID routing
Extract _reqId from incoming WS messages and include it in every direct
response sendToPeer call and sendError call. Clients can now match
responses to requests by ID instead of relying on FIFO ordering.
Old clients without _reqId are unaffected (field simply omitted).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:28:30 +01:00
Alejandro Gutiérrez
8f925d9a9e fix(cli): correlation ID refactor — resolver Maps with _reqId + FIFO fallback
Replace all 22 resolver Array<fn> patterns with Map<reqId, {resolve, timer}>.
Outgoing messages now include _reqId; on response the broker's echoed _reqId
is used for exact matching, with FIFO fallback for brokers that don't echo it.
Add makeReqId() helper and resolveFromMap() utility. Error propagation block
updated to iterate Maps and pop the oldest entry across all queues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:25:51 +01:00
Alejandro Gutiérrez
4ce1034dcd chore(cli): v0.6.5 — all bug fixes batch
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
2026-04-07 13:12:29 +01:00
Alejandro Gutiérrez
e26a36e543 fix(broker): vector_stored type, set_state no-resp, subscribe ack
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- vector_store sends {type:"vector_stored",id}; wrapped in try/catch
- set_state no longer sends state_result (fire-and-forget)
- subscribe sends {type:"subscribed",stream} confirmation
- remove broken myPresence lookup in mesh_info
- add WSVectorStoredMessage + WSSubscribedMessage to types union

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:08:06 +01:00
Alejandro Gutiérrez
60c74d9463 fix(broker): shareContext stable upsert key + createStream atomic upsert
- shareContext: adds optional memberId param; when provided, upserts on
  (meshId, memberId) instead of (meshId, presenceId) — prevents stale
  context rows accumulating on every reconnect. Falls back to presenceId
  for legacy/anonymous connections. Also refreshes presenceId on update
  so it stays current.
- schema: adds member_id column + unique index context_mesh_member_idx
  on mesh.context table; new migration 0013_context-stable-member-key.sql.
- index.ts call site updated to pass conn.memberId as the stable key.
- createStream: replaces SELECT-then-INSERT TOCTOU race with atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING, followed by SELECT on miss.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:58 +01:00
Alejandro Gutiérrez
6fba9bd4eb feat(cli): fix field mismatches + error propagation
- claim_task/complete_task: send taskId not id
- graph_result: read msg.records not msg.rows
- message_status: try all mesh clients, not only first
- broker: omit state_result for set_state (fixes get_state cross-contamination)
- error handler: unblock first pending resolver on unmatched broker errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:25 +01:00
Alejandro Gutiérrez
5bcc1fe323 chore(cli): bump to v0.6.4 — fix get_file sealedKey bug
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
2026-04-07 12:57:20 +01:00
Alejandro Gutiérrez
e70f0ed1ff fix(broker/cli): e2e get_file owner sealedKey bug
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
broker: owner also fetches sealedKey from mesh.file_key (not skipped),
  only non-owners are blocked when key is missing
cli: explicit error when encrypted file has no sealedKey (no silent raw download)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:56:36 +01:00
Alejandro Gutiérrez
5f696f47ea feat(cli): v0.6.3 — e2e file crypto module + encrypted share_file
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- add crypto/file-crypto.ts: encryptFile, decryptFile, sealKeyForPeer, openSealedKey
- update share_file: when to= set, encrypts file + seals key per recipient
- update get_file: decrypts if encrypted + sealedKey present
- add grant_file_access tool: re-seals Kf for a new peer without re-upload
- add getSessionPubkey/getSessionSecretKey getters on BrokerClient
- add grantFileAccess WS method on BrokerClient
- bump version to 0.6.3

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:53:13 +01:00
Alejandro Gutiérrez
ccb9fb2a68 feat(broker/db): e2e file encryption schema + db functions
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- add mesh.file_key table (fileId, peerPubkey, sealedKey, grantedByPubkey)
- add encrypted + ownerPubkey columns to mesh.file
- export insertFileKeys, getFileKey, grantFileKey from broker.ts
- update uploadFile/getFile/listFiles to include encrypted/ownerPubkey
- migration 0012_add-file-encryption applied to prod

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:43:57 +01:00
Alejandro Gutiérrez
898c061089 feat(cli): e2e file encryption — file-crypto.ts + client + MCP tools
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:33:39 +01:00
Alejandro Gutiérrez
f7a6559429 feat(broker): add E2E file encryption to HTTP upload and WS handlers
- parse x-encrypted/x-owner-pubkey/x-file-keys headers in handleUploadPost
- pass encrypted and ownerPubkey to uploadFile, call insertFileKeys after
- get_file: fetch sealedKey for non-owners, block if missing, include in response
- list_files: include encrypted field per file
- add grant_file_access WS handler so owners can seal keys for peers
- update types.ts with new message interfaces and union members

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:32:46 +01:00
Alejandro Gutiérrez
579d0c3d3e chore: bump version to 0.6.0
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:21:03 +01:00
Alejandro Gutiérrez
190f5a958e refactor(cli): migrate to citty — --help generated from flag definitions
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Replace manual switch + HELP string with citty defineCommand/runMain.
Flag definitions in index.ts are now the single source of truth for
--help output. Remove parseArgs() from launch.ts; accept citty-parsed
flags + rawArgs (-- passthrough to claude preserved).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:19:16 +01:00
Alejandro Gutiérrez
03661e1b68 docs(cli): expand --help with all launch flags, groups hierarchy, env vars
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:16:04 +01:00
Alejandro Gutiérrez
d451fc296e feat: hierarchical group routing + role wiring
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
broker: expand member groups to ancestor paths at drain time (pull model)
- @flexicar message reaches peers in @flexicar/core, @flexicar/output, etc.
- Resolved at drainForMember — no DB changes, fully backward-compatible
- Any depth: flexicar/team/backend also matches @flexicar and @flexicar/team

cli: wire --role all the way through to session config + env
- Config.role field added
- launch.ts stores role in sessionConfig, passes CLAUDEMESH_ROLE env var
- mcp/server.ts includes role in identity string
- manager.ts auto-joins groups from config on WS connect (--groups flag now works)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:09:37 +01:00
Alejandro Gutiérrez
3da5d71275 fix(broker): fix share_file DB insert failures
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- Normalise tags to Array before Drizzle insert (PgArray mapper calls
  .map() and throws if value is not a standard JS Array)
- Use uploadedByName instead of uploadedByMember FK — the X-Member-Id
  header carries the mesh slug, not a mesh.member primary key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 08:56:43 +01:00
Alejandro Gutiérrez
cdf335f609 fix(broker): fix MINIO_USE_SSL env coercion
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
z.coerce.boolean() treats any non-empty string as true, so MINIO_USE_SSL="false" → true.
Switch to explicit enum+transform.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 08:38:06 +01:00
Alejandro Gutiérrez
0cd16ff358 fix: exclude sender only for broadcasts, not direct messages
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
The sender exclusion filter (excludeSenderSessionPubkey) was blocking
delivery of ALL messages from the sender, including direct messages
to other peers. Now only excludes on broadcast (target_spec = '*').

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:34:09 +01:00
Alejandro Gutiérrez
3e9707276d fix: add diagnostic logging to maybePushQueuedMessages
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:21:29 +01:00
17 changed files with 10186 additions and 522 deletions

View File

@@ -34,6 +34,7 @@ import {
mesh, mesh,
meshFile, meshFile,
meshFileAccess, meshFileAccess,
meshFileKey,
meshContext, meshContext,
meshMember as memberTable, meshMember as memberTable,
meshMemory, meshMemory,
@@ -717,6 +718,8 @@ export async function uploadFile(args: {
uploadedByMember?: string; uploadedByMember?: string;
targetSpec?: string; targetSpec?: string;
expiresAt?: Date; expiresAt?: Date;
encrypted?: boolean;
ownerPubkey?: string;
}): Promise<string> { }): Promise<string> {
const [row] = await db const [row] = await db
.insert(meshFile) .insert(meshFile)
@@ -732,6 +735,8 @@ export async function uploadFile(args: {
uploadedByMember: args.uploadedByMember ?? null, uploadedByMember: args.uploadedByMember ?? null,
targetSpec: args.targetSpec ?? null, targetSpec: args.targetSpec ?? null,
expiresAt: args.expiresAt ?? null, expiresAt: args.expiresAt ?? null,
encrypted: args.encrypted ?? false,
ownerPubkey: args.ownerPubkey ?? null,
}) })
.returning({ id: meshFile.id }); .returning({ id: meshFile.id });
if (!row) throw new Error("failed to insert file row"); if (!row) throw new Error("failed to insert file row");
@@ -755,6 +760,8 @@ export async function getFile(
uploadedByName: string | null; uploadedByName: string | null;
targetSpec: string | null; targetSpec: string | null;
uploadedAt: Date; uploadedAt: Date;
encrypted: boolean;
ownerPubkey: string | null;
} | null> { } | null> {
const [row] = await db const [row] = await db
.select({ .select({
@@ -768,6 +775,8 @@ export async function getFile(
uploadedByName: meshFile.uploadedByName, uploadedByName: meshFile.uploadedByName,
targetSpec: meshFile.targetSpec, targetSpec: meshFile.targetSpec,
uploadedAt: meshFile.uploadedAt, uploadedAt: meshFile.uploadedAt,
encrypted: meshFile.encrypted,
ownerPubkey: meshFile.ownerPubkey,
}) })
.from(meshFile) .from(meshFile)
.where( .where(
@@ -782,6 +791,8 @@ export async function getFile(
return { return {
...row, ...row,
tags: (row.tags ?? []) as string[], tags: (row.tags ?? []) as string[],
encrypted: row.encrypted,
ownerPubkey: row.ownerPubkey,
}; };
} }
@@ -801,6 +812,7 @@ export async function listFiles(
uploadedBy: string; uploadedBy: string;
uploadedAt: Date; uploadedAt: Date;
persistent: boolean; persistent: boolean;
encrypted: boolean;
}> }>
> { > {
const conditions = [ const conditions = [
@@ -822,6 +834,7 @@ export async function listFiles(
uploadedByName: meshFile.uploadedByName, uploadedByName: meshFile.uploadedByName,
uploadedAt: meshFile.uploadedAt, uploadedAt: meshFile.uploadedAt,
persistent: meshFile.persistent, persistent: meshFile.persistent,
encrypted: meshFile.encrypted,
}) })
.from(meshFile) .from(meshFile)
.where(and(...conditions)) .where(and(...conditions))
@@ -835,6 +848,7 @@ export async function listFiles(
uploadedBy: r.uploadedByName ?? "unknown", uploadedBy: r.uploadedByName ?? "unknown",
uploadedAt: r.uploadedAt, uploadedAt: r.uploadedAt,
persistent: r.persistent, persistent: r.persistent,
encrypted: r.encrypted,
})); }));
} }
@@ -892,11 +906,62 @@ export async function deleteFile(
); );
} }
/** Insert encrypted key blobs for a newly uploaded E2E file. */
export async function insertFileKeys(
fileId: string,
keys: Array<{ peerPubkey: string; sealedKey: string; grantedByPubkey?: string }>,
): Promise<void> {
if (keys.length === 0) return;
await db.insert(meshFileKey).values(
keys.map((k) => ({
fileId,
peerPubkey: k.peerPubkey,
sealedKey: k.sealedKey,
grantedByPubkey: k.grantedByPubkey ?? null,
})),
);
}
/** Get the sealed key for a specific peer, or null if not authorized. */
export async function getFileKey(
fileId: string,
peerPubkey: string,
): Promise<string | null> {
const [row] = await db
.select({ sealedKey: meshFileKey.sealedKey })
.from(meshFileKey)
.where(
and(eq(meshFileKey.fileId, fileId), eq(meshFileKey.peerPubkey, peerPubkey)),
);
return row?.sealedKey ?? null;
}
/** Grant a peer access to an encrypted file (upsert their key blob). */
export async function grantFileKey(
fileId: string,
peerPubkey: string,
sealedKey: string,
grantedByPubkey: string,
): Promise<void> {
await db
.insert(meshFileKey)
.values({ fileId, peerPubkey, sealedKey, grantedByPubkey })
.onConflictDoUpdate({
target: [meshFileKey.fileId, meshFileKey.peerPubkey],
set: { sealedKey, grantedByPubkey, grantedAt: new Date() },
});
}
// --- Context sharing --- // --- Context sharing ---
/** /**
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair * Upsert a context snapshot for a peer. When `memberId` is provided the
* has at most one context row — repeated calls update it in place. * row is keyed on (meshId, memberId) — a stable identifier that survives
* reconnects. This prevents stale rows from accumulating every time a
* session reconnects with a fresh ephemeral presenceId.
*
* Falls back to (meshId, presenceId) lookup when memberId is absent
* (e.g. legacy callers or anonymous connections).
*/ */
export async function shareContext( export async function shareContext(
meshId: string, meshId: string,
@@ -906,24 +971,27 @@ export async function shareContext(
filesRead?: string[], filesRead?: string[],
keyFindings?: string[], keyFindings?: string[],
tags?: string[], tags?: string[],
memberId?: string,
): Promise<string> { ): Promise<string> {
const now = new Date(); const now = new Date();
// Try to find existing context for this presence in this mesh.
// Build the WHERE clause: prefer stable memberId, fall back to presenceId.
const lookupWhere = memberId
? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId))
: and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId));
const [existing] = await db const [existing] = await db
.select({ id: meshContext.id }) .select({ id: meshContext.id })
.from(meshContext) .from(meshContext)
.where( .where(lookupWhere)
and(
eq(meshContext.meshId, meshId),
eq(meshContext.presenceId, presenceId),
),
)
.limit(1); .limit(1);
if (existing) { if (existing) {
await db await db
.update(meshContext) .update(meshContext)
.set({ .set({
// Keep presenceId current so it reflects the latest connection.
presenceId,
peerName: peerName ?? null, peerName: peerName ?? null,
summary, summary,
filesRead: filesRead ?? [], filesRead: filesRead ?? [],
@@ -939,6 +1007,7 @@ export async function shareContext(
.insert(meshContext) .insert(meshContext)
.values({ .values({
meshId, meshId,
memberId: memberId ?? null,
presenceId, presenceId,
peerName: peerName ?? null, peerName: peerName ?? null,
summary, summary,
@@ -1188,16 +1257,22 @@ export async function createStream(
name: string, name: string,
createdByName: string, createdByName: string,
): Promise<string> { ): Promise<string> {
const existing = await db // Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race
// when two callers concurrently attempt to create the same stream.
const [inserted] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.onConflictDoNothing()
.returning({ id: meshStream.id });
if (inserted) return inserted.id;
// Row already existed — fetch the id.
const [existing] = await db
.select({ id: meshStream.id }) .select({ id: meshStream.id })
.from(meshStream) .from(meshStream)
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name))); .where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
if (existing.length > 0) return existing[0]!.id; return existing!.id;
const [row] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.returning({ id: meshStream.id });
return row!.id;
} }
/** /**
@@ -1302,11 +1377,28 @@ export async function drainForMember(
); );
// Build group target matching: @all (broadcast alias) + @<groupname> // Build group target matching: @all (broadcast alias) + @<groupname>
// for each group the peer belongs to. // for each group the peer belongs to, expanded to all ancestor paths.
//
// Hierarchical routing (downward propagation):
// A peer in "flexicar/core" also matches messages sent to "@flexicar".
// A peer in "flexicar/core/backend" matches "@flexicar/core" and "@flexicar".
// This lets leads send to a parent group and reach all sub-teams.
//
// Resolution happens at drain time (pull model) — no duplicates stored,
// no schema changes, fully backward-compatible.
const groupTargets = ["@all"]; const groupTargets = ["@all"];
if (memberGroups) { if (memberGroups) {
const seen = new Set<string>();
for (const g of memberGroups) { for (const g of memberGroups) {
groupTargets.push(`@${g}`); const parts = g.split("/");
// Add the group itself + every ancestor prefix.
for (let depth = parts.length; depth > 0; depth--) {
const ancestor = parts.slice(0, depth).join("/");
if (!seen.has(ancestor)) {
seen.add(ancestor);
groupTargets.push(`@${ancestor}`);
}
}
} }
} }
const groupTargetList = sql.raw( const groupTargetList = sql.raw(
@@ -1337,7 +1429,7 @@ export async function drainForMember(
AND delivered_at IS NULL AND delivered_at IS NULL
AND priority::text IN (${priorityList}) AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList})) AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList}))
${excludeSenderSessionPubkey ? sql`AND (sender_session_pubkey IS NULL OR sender_session_pubkey != ${excludeSenderSessionPubkey})` : sql``} ${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
ORDER BY created_at ASC, id ASC ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED FOR UPDATE SKIP LOCKED
) )

View File

@@ -23,7 +23,7 @@ const envSchema = z.object({
MINIO_ENDPOINT: z.string().default("minio:9000"), MINIO_ENDPOINT: z.string().default("minio:9000"),
MINIO_ACCESS_KEY: z.string().default("claudemesh"), MINIO_ACCESS_KEY: z.string().default("claudemesh"),
MINIO_SECRET_KEY: z.string().default("changeme"), MINIO_SECRET_KEY: z.string().default("changeme"),
MINIO_USE_SSL: z.coerce.boolean().default(false), MINIO_USE_SSL: z.enum(["true", "false", ""]).transform(v => v === "true").default("false"),
QDRANT_URL: z.string().default("http://qdrant:6333"), QDRANT_URL: z.string().default("http://qdrant:6333"),
NEO4J_URL: z.string().default("bolt://neo4j:7687"), NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"), NEO4J_USER: z.string().default("neo4j"),

View File

@@ -31,10 +31,13 @@ import {
forgetMemory, forgetMemory,
getContext, getContext,
getFile, getFile,
getFileKey,
getFileStatus, getFileStatus,
getState, getState,
grantFileKey,
handleHookSetStatus, handleHookSetStatus,
heartbeat, heartbeat,
insertFileKeys,
joinGroup, joinGroup,
joinMesh, joinMesh,
leaveGroup, leaveGroup,
@@ -123,7 +126,10 @@ async function maybePushQueuedMessages(
excludeSenderSessionPubkey?: string, excludeSenderSessionPubkey?: string,
): Promise<void> { ): Promise<void> {
const conn = connections.get(presenceId); const conn = connections.get(presenceId);
if (!conn) return; if (!conn) {
log.debug("maybePush: no connection for presence", { presence_id: presenceId });
return;
}
const status = await refreshStatusFromJsonl( const status = await refreshStatusFromJsonl(
presenceId, presenceId,
conn.cwd, conn.cwd,
@@ -138,6 +144,13 @@ async function maybePushQueuedMessages(
excludeSenderSessionPubkey, excludeSenderSessionPubkey,
conn.groups.map((g) => g.name), conn.groups.map((g) => g.name),
); );
log.info("maybePush", {
presence_id: presenceId,
status,
session_pubkey: conn.sessionPubkey?.slice(0, 12),
exclude: excludeSenderSessionPubkey?.slice(0, 12),
drained: messages.length,
});
for (const m of messages) { for (const m of messages) {
const push: WSPushMessage = { const push: WSPushMessage = {
type: "push", type: "push",
@@ -368,6 +381,9 @@ function handleUploadPost(
const tagsRaw = req.headers["x-tags"] as string | undefined; const tagsRaw = req.headers["x-tags"] as string | undefined;
const persistentRaw = req.headers["x-persistent"] as string | undefined; const persistentRaw = req.headers["x-persistent"] as string | undefined;
const targetSpec = req.headers["x-target-spec"] as string | undefined; const targetSpec = req.headers["x-target-spec"] as string | undefined;
const encryptedRaw = req.headers["x-encrypted"] as string | undefined;
const ownerPubkey = req.headers["x-owner-pubkey"] as string | undefined;
const fileKeysRaw = req.headers["x-file-keys"] as string | undefined;
if (!meshId || !memberId || !fileName) { if (!meshId || !memberId || !fileName) {
writeJson(res, 400, { writeJson(res, 400, {
@@ -435,19 +451,44 @@ function handleUploadPost(
: undefined, : undefined,
); );
// Insert DB row // Insert DB row — normalise tags to a real JS Array (Drizzle PgArray
// mapper calls .map() on the value; non-Array iterables break it).
// Skip uploadedByMember FK — memberId from the client header is the
// mesh slug, not a mesh.member primary key.
const encrypted = encryptedRaw === "true";
let fileKeys: Array<{ peerPubkey: string; sealedKey: string }> = [];
if (encrypted && fileKeysRaw) {
try {
fileKeys = JSON.parse(fileKeysRaw);
} catch { /* ignore */ }
}
const dbFileId = await uploadFile({ const dbFileId = await uploadFile({
meshId, meshId,
name: fileName, name: fileName,
sizeBytes: body.length, sizeBytes: body.length,
mimeType: (req.headers["content-type"] as string) || undefined, mimeType: (req.headers["content-type"] as string) || undefined,
minioKey, minioKey,
tags, tags: Array.isArray(tags) ? tags : [],
persistent, persistent,
uploadedByMember: memberId, uploadedByName: memberId || undefined,
uploadedByMember: undefined,
targetSpec: targetSpec || undefined, targetSpec: targetSpec || undefined,
encrypted: encrypted || false,
ownerPubkey: ownerPubkey || undefined,
}); });
if (encrypted && fileKeys.length > 0) {
await insertFileKeys(
dbFileId,
fileKeys.map((k) => ({
peerPubkey: k.peerPubkey,
sealedKey: k.sealedKey,
grantedByPubkey: ownerPubkey,
})),
);
}
writeJson(res, 200, { ok: true, fileId: dbFileId }); writeJson(res, 200, { ok: true, fileId: dbFileId });
log.info("upload", { log.info("upload", {
route: "POST /upload", route: "POST /upload",
@@ -506,8 +547,9 @@ function sendError(
code: string, code: string,
message: string, message: string,
id?: string, id?: string,
reqId?: string,
): void { ): void {
const err: WSServerMessage = { type: "error", code, message, id }; const err: WSServerMessage = { type: "error", code, message, id, ...(reqId ? { _reqId: reqId } : {}) };
try { try {
ws.send(JSON.stringify(err)); ws.send(JSON.stringify(err));
} catch { } catch {
@@ -686,6 +728,7 @@ function handleConnection(ws: WebSocket): void {
ws.on("message", async (raw) => { ws.on("message", async (raw) => {
try { try {
const msg = JSON.parse(raw.toString()) as WSClientMessage; const msg = JSON.parse(raw.toString()) as WSClientMessage;
const _reqId = (msg as any)._reqId as string | undefined;
if (msg.type === "hello") { if (msg.type === "hello") {
const result = await handleHello(ws, msg); const result = await handleHello(ws, msg);
if (!result) return; if (!result) return;
@@ -735,6 +778,7 @@ function handleConnection(ws: WebSocket): void {
sessionId: p.sessionId, sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(), connectedAt: p.connectedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}; };
conn.ws.send(JSON.stringify(resp)); conn.ws.send(JSON.stringify(resp));
log.info("ws list_peers", { log.info("ws list_peers", {
@@ -803,14 +847,8 @@ function handleConnection(ws: WebSocket): void {
updatedBy: stateRow.updatedBy, updatedBy: stateRow.updatedBy,
}); });
} }
// Send confirmation back to sender as state_result. // Fire-and-forget: no state_result sent back to sender.
sendToPeer(presenceId, { // The client (server.ts) returns success immediately without waiting.
type: "state_result",
key: stateRow.key,
value: stateRow.value,
updatedBy: stateRow.updatedBy,
updatedAt: stateRow.updatedAt.toISOString(),
});
log.info("ws set_state", { log.info("ws set_state", {
presence_id: presenceId, presence_id: presenceId,
key: ss.key, key: ss.key,
@@ -827,6 +865,7 @@ function handleConnection(ws: WebSocket): void {
value: stateEntry.value, value: stateEntry.value,
updatedBy: stateEntry.updatedBy, updatedBy: stateEntry.updatedBy,
updatedAt: stateEntry.updatedAt.toISOString(), updatedAt: stateEntry.updatedAt.toISOString(),
...(_reqId ? { _reqId } : {}),
}); });
} else { } else {
sendToPeer(presenceId, { sendToPeer(presenceId, {
@@ -835,6 +874,7 @@ function handleConnection(ws: WebSocket): void {
value: null, value: null,
updatedBy: "", updatedBy: "",
updatedAt: "", updatedAt: "",
...(_reqId ? { _reqId } : {}),
}); });
} }
log.info("ws get_state", { log.info("ws get_state", {
@@ -854,6 +894,7 @@ function handleConnection(ws: WebSocket): void {
updatedBy: e.updatedBy, updatedBy: e.updatedBy,
updatedAt: e.updatedAt.toISOString(), updatedAt: e.updatedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws list_state", { log.info("ws list_state", {
presence_id: presenceId, presence_id: presenceId,
@@ -876,6 +917,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "memory_stored", type: "memory_stored",
id: memoryId, id: memoryId,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws remember", { log.info("ws remember", {
presence_id: presenceId, presence_id: presenceId,
@@ -895,6 +937,7 @@ function handleConnection(ws: WebSocket): void {
rememberedBy: m.rememberedBy, rememberedBy: m.rememberedBy,
rememberedAt: m.rememberedAt.toISOString(), rememberedAt: m.rememberedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws recall", { log.info("ws recall", {
presence_id: presenceId, presence_id: presenceId,
@@ -911,6 +954,7 @@ function handleConnection(ws: WebSocket): void {
id: fg.memoryId, id: fg.memoryId,
messageId: fg.memoryId, messageId: fg.memoryId,
queued: false, queued: false,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws forget", { log.info("ws forget", {
presence_id: presenceId, presence_id: presenceId,
@@ -922,7 +966,7 @@ function handleConnection(ws: WebSocket): void {
const gf = msg as Extract<WSClientMessage, { type: "get_file" }>; const gf = msg as Extract<WSClientMessage, { type: "get_file" }>;
const file = await getFile(conn.meshId, gf.fileId); const file = await getFile(conn.meshId, gf.fileId);
if (!file) { if (!file) {
sendError(conn.ws, "not_found", "file not found"); sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break; break;
} }
// Access control: if targetSpec is set, verify peer matches // Access control: if targetSpec is set, verify peer matches
@@ -932,7 +976,20 @@ function handleConnection(ws: WebSocket): void {
file.targetSpec === conn.sessionPubkey || file.targetSpec === conn.sessionPubkey ||
file.targetSpec === "*"; file.targetSpec === "*";
if (!matches) { if (!matches) {
sendError(conn.ws, "forbidden", "file not targeted at you"); sendError(conn.ws, "forbidden", "file not targeted at you", undefined, _reqId);
break;
}
}
// E2E: for encrypted files, fetch the sealed key for this peer.
// Owners are not blocked if their key is missing (edge case), but
// they still get it returned so the CLI can decrypt normally.
let sealedKey: string | null = null;
if (file.encrypted) {
const peerPubkey = conn.sessionPubkey ?? conn.memberPubkey;
const isOwner = !!(file.ownerPubkey && peerPubkey === file.ownerPubkey);
sealedKey = peerPubkey ? await getFileKey(gf.fileId, peerPubkey) : null;
if (!sealedKey && !isOwner) {
sendError(conn.ws, "forbidden", "no decryption key for this file", undefined, _reqId);
break; break;
} }
} }
@@ -957,6 +1014,9 @@ function handleConnection(ws: WebSocket): void {
fileId: gf.fileId, fileId: gf.fileId,
url: presignedUrl, url: presignedUrl,
name: file.name, name: file.name,
encrypted: file.encrypted,
sealedKey: sealedKey ?? undefined,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws get_file", { log.info("ws get_file", {
presence_id: presenceId, presence_id: presenceId,
@@ -977,7 +1037,9 @@ function handleConnection(ws: WebSocket): void {
uploadedBy: f.uploadedBy, uploadedBy: f.uploadedBy,
uploadedAt: f.uploadedAt.toISOString(), uploadedAt: f.uploadedAt.toISOString(),
persistent: f.persistent, persistent: f.persistent,
encrypted: f.encrypted,
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws list_files", { log.info("ws list_files", {
presence_id: presenceId, presence_id: presenceId,
@@ -996,6 +1058,7 @@ function handleConnection(ws: WebSocket): void {
peerName: a.peerName, peerName: a.peerName,
accessedAt: a.accessedAt.toISOString(), accessedAt: a.accessedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws file_status", { log.info("ws file_status", {
presence_id: presenceId, presence_id: presenceId,
@@ -1003,6 +1066,23 @@ function handleConnection(ws: WebSocket): void {
}); });
break; break;
} }
case "grant_file_access": {
const gfa = msg as { type: "grant_file_access"; fileId: string; peerPubkey: string; sealedKey: string };
const file = await getFile(conn.meshId, gfa.fileId);
if (!file) {
sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break;
}
const requestorPubkey = conn.sessionPubkey ?? conn.memberPubkey;
if (file.ownerPubkey && file.ownerPubkey !== requestorPubkey) {
sendError(conn.ws, "forbidden", "only the file owner can grant access", undefined, _reqId);
break;
}
await grantFileKey(gfa.fileId, gfa.peerPubkey, gfa.sealedKey, requestorPubkey ?? undefined);
sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey, ...(_reqId ? { _reqId } : {}) });
log.info("ws grant_file_access", { presence_id: presenceId, file_id: gfa.fileId, peer: gfa.peerPubkey });
break;
}
case "delete_file": { case "delete_file": {
const df = msg as Extract<WSClientMessage, { type: "delete_file" }>; const df = msg as Extract<WSClientMessage, { type: "delete_file" }>;
await deleteFile(conn.meshId, df.fileId); await deleteFile(conn.meshId, df.fileId);
@@ -1011,6 +1091,7 @@ function handleConnection(ws: WebSocket): void {
id: df.fileId, id: df.fileId,
messageId: df.fileId, messageId: df.fileId,
queued: false, queued: false,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws delete_file", { log.info("ws delete_file", {
presence_id: presenceId, presence_id: presenceId,
@@ -1031,7 +1112,7 @@ function handleConnection(ws: WebSocket): void {
.from(messageQueue) .from(messageQueue)
.where(eq(messageQueue.id, ms.messageId)); .where(eq(messageQueue.id, ms.messageId));
if (!mqRow || mqRow.meshId !== conn.meshId) { if (!mqRow || mqRow.meshId !== conn.meshId) {
sendError(conn.ws, "not_found", "message not found"); sendError(conn.ws, "not_found", "message not found", undefined, _reqId);
break; break;
} }
// Build per-recipient status from connected peers. // Build per-recipient status from connected peers.
@@ -1075,6 +1156,7 @@ function handleConnection(ws: WebSocket): void {
delivered: !!mqRow.deliveredAt, delivered: !!mqRow.deliveredAt,
deliveredAt: mqRow.deliveredAt?.toISOString() ?? null, deliveredAt: mqRow.deliveredAt?.toISOString() ?? null,
recipients, recipients,
...(_reqId ? { _reqId } : {}),
}; };
sendToPeer(presenceId, resp); sendToPeer(presenceId, resp);
log.info("ws message_status", { log.info("ws message_status", {
@@ -1097,6 +1179,7 @@ function handleConnection(ws: WebSocket): void {
sc.filesRead, sc.filesRead,
sc.keyFindings, sc.keyFindings,
sc.tags, sc.tags,
conn.memberId,
); );
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "context_shared", type: "context_shared",
@@ -1132,6 +1215,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags, tags: c.tags,
updatedAt: c.updatedAt.toISOString(), updatedAt: c.updatedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws get_context", { log.info("ws get_context", {
presence_id: presenceId, presence_id: presenceId,
@@ -1150,6 +1234,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags, tags: c.tags,
updatedAt: c.updatedAt.toISOString(), updatedAt: c.updatedAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws list_contexts", { log.info("ws list_contexts", {
presence_id: presenceId, presence_id: presenceId,
@@ -1174,6 +1259,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "task_created", type: "task_created",
id: taskId, id: taskId,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws create_task", { log.info("ws create_task", {
presence_id: presenceId, presence_id: presenceId,
@@ -1194,7 +1280,7 @@ function handleConnection(ws: WebSocket): void {
memberInfo?.displayName, memberInfo?.displayName,
); );
if (!claimed) { if (!claimed) {
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist"); sendError(conn.ws, "task_not_claimable", "task is not open or does not exist", undefined, _reqId);
break; break;
} }
// Return updated task list so caller sees the change. // Return updated task list so caller sees the change.
@@ -1212,6 +1298,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags, tags: t.tags,
createdAt: t.createdAt.toISOString(), createdAt: t.createdAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws claim_task", { log.info("ws claim_task", {
presence_id: presenceId, presence_id: presenceId,
@@ -1227,7 +1314,7 @@ function handleConnection(ws: WebSocket): void {
cpt.result, cpt.result,
); );
if (!completed) { if (!completed) {
sendError(conn.ws, "task_not_found", "task not found in this mesh"); sendError(conn.ws, "task_not_found", "task not found in this mesh", undefined, _reqId);
break; break;
} }
// Return updated task list. // Return updated task list.
@@ -1245,6 +1332,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags, tags: t.tags,
createdAt: t.createdAt.toISOString(), createdAt: t.createdAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws complete_task", { log.info("ws complete_task", {
presence_id: presenceId, presence_id: presenceId,
@@ -1268,6 +1356,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags, tags: t.tags,
createdAt: t.createdAt.toISOString(), createdAt: t.createdAt.toISOString(),
})), })),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws list_tasks", { log.info("ws list_tasks", {
presence_id: presenceId, presence_id: presenceId,
@@ -1293,6 +1382,7 @@ function handleConnection(ws: WebSocket): void {
type: "stream_created", type: "stream_created",
id: streamId, id: streamId,
name: cs.name, name: cs.name,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws create_stream", { log.info("ws create_stream", {
presence_id: presenceId, presence_id: presenceId,
@@ -1307,6 +1397,7 @@ function handleConnection(ws: WebSocket): void {
if (!streamSubscriptions.has(key)) if (!streamSubscriptions.has(key))
streamSubscriptions.set(key, new Set()); streamSubscriptions.set(key, new Set());
streamSubscriptions.get(key)!.add(presenceId); streamSubscriptions.get(key)!.add(presenceId);
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream, ...(_reqId ? { _reqId } : {}) });
log.info("ws subscribe", { log.info("ws subscribe", {
presence_id: presenceId, presence_id: presenceId,
stream: sub.stream, stream: sub.stream,
@@ -1365,6 +1456,7 @@ function handleConnection(ws: WebSocket): void {
subscriberCount: streamSubscriptions.get(key)?.size ?? 0, subscriberCount: streamSubscriptions.get(key)?.size ?? 0,
}; };
}), }),
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws list_streams", { log.info("ws list_streams", {
presence_id: presenceId, presence_id: presenceId,
@@ -1379,40 +1471,43 @@ function handleConnection(ws: WebSocket): void {
case "vector_store": { case "vector_store": {
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>; const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
const collName = meshCollectionName(conn.meshId, vs.collection); const collName = meshCollectionName(conn.meshId, vs.collection);
await ensureCollection(collName); try {
const { generateId } = await import("@turbostarter/shared/utils"); await ensureCollection(collName);
const pointId = generateId(); const { generateId } = await import("@turbostarter/shared/utils");
// Store text + metadata as payload. Use a zero vector as placeholder const pointId = generateId();
// — real embeddings should be computed client-side and sent directly // Store text + metadata as payload. Use a zero vector as placeholder
// to Qdrant in a future version. // — real embeddings should be computed client-side and sent directly
const zeroVector = new Array(1536).fill(0) as number[]; // to Qdrant in a future version.
await qdrant.upsert(collName, { const zeroVector = new Array(1536).fill(0) as number[];
wait: true, await qdrant.upsert(collName, {
points: [ wait: true,
{ points: [
id: pointId, {
vector: zeroVector, id: pointId,
payload: { vector: zeroVector,
text: vs.text, payload: {
mesh_id: conn.meshId, text: vs.text,
stored_by: conn.memberPubkey, mesh_id: conn.meshId,
stored_at: new Date().toISOString(), stored_by: conn.memberPubkey,
...(vs.metadata ?? {}), stored_at: new Date().toISOString(),
...(vs.metadata ?? {}),
},
}, },
}, ],
], });
}); sendToPeer(presenceId, {
sendToPeer(presenceId, { type: "vector_stored",
type: "ack" as const, id: pointId,
id: pointId, ...(_reqId ? { _reqId } : {}),
messageId: pointId, });
queued: false, log.info("ws vector_store", {
}); presence_id: presenceId,
log.info("ws vector_store", { collection: vs.collection,
presence_id: presenceId, point_id: pointId,
collection: vs.collection, });
point_id: pointId, } catch (e) {
}); sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e), undefined, _reqId);
}
break; break;
} }
case "vector_search": { case "vector_search": {
@@ -1446,12 +1541,14 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "vector_results", type: "vector_results",
results: matches, results: matches,
...(_reqId ? { _reqId } : {}),
}); });
} catch { } catch {
// Collection may not exist yet — return empty results. // Collection may not exist yet — return empty results.
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "vector_results", type: "vector_results",
results: [], results: [],
...(_reqId ? { _reqId } : {}),
}); });
} }
log.info("ws vector_search", { log.info("ws vector_search", {
@@ -1477,6 +1574,7 @@ function handleConnection(ws: WebSocket): void {
id: vd.id, id: vd.id,
messageId: vd.id, messageId: vd.id,
queued: false, queued: false,
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws vector_delete", { log.info("ws vector_delete", {
presence_id: presenceId, presence_id: presenceId,
@@ -1496,11 +1594,13 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "collection_list", type: "collection_list",
collections: meshCollections, collections: meshCollections,
...(_reqId ? { _reqId } : {}),
}); });
} catch { } catch {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "collection_list", type: "collection_list",
collections: [], collections: [],
...(_reqId ? { _reqId } : {}),
}); });
} }
log.info("ws list_collections", { log.info("ws list_collections", {
@@ -1535,9 +1635,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "graph_result", type: "graph_result",
records: gqRecords, records: gqRecords,
...(_reqId ? { _reqId } : {}),
}); });
} catch (gqErr) { } catch (gqErr) {
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr)); sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr), undefined, _reqId);
} finally { } finally {
await gqSession.close(); await gqSession.close();
} }
@@ -1569,9 +1670,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, { sendToPeer(presenceId, {
type: "graph_result", type: "graph_result",
records: geRecords, records: geRecords,
...(_reqId ? { _reqId } : {}),
}); });
} catch (geErr) { } catch (geErr) {
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr)); sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr), undefined, _reqId);
} finally { } finally {
await geSession.close(); await geSession.close();
} }
@@ -1588,12 +1690,14 @@ function handleConnection(ws: WebSocket): void {
const mq = msg as Extract<WSClientMessage, { type: "mesh_query" }>; const mq = msg as Extract<WSClientMessage, { type: "mesh_query" }>;
try { try {
const result = await meshQuery(conn.meshId, mq.sql); const result = await meshQuery(conn.meshId, mq.sql);
sendToPeer(presenceId, { type: "mesh_query_result", ...result }); sendToPeer(presenceId, { type: "mesh_query_result", ...result, ...(_reqId ? { _reqId } : {}) });
} catch (e) { } catch (e) {
sendError( sendError(
conn.ws, conn.ws,
"query_error", "query_error",
e instanceof Error ? e.message : String(e), e instanceof Error ? e.message : String(e),
undefined,
_reqId,
); );
} }
log.info("ws mesh_query", { log.info("ws mesh_query", {
@@ -1611,12 +1715,15 @@ function handleConnection(ws: WebSocket): void {
columns: [], columns: [],
rows: [], rows: [],
rowCount: result.rowCount, rowCount: result.rowCount,
...(_reqId ? { _reqId } : {}),
}); });
} catch (e) { } catch (e) {
sendError( sendError(
conn.ws, conn.ws,
"execute_error", "execute_error",
e instanceof Error ? e.message : String(e), e instanceof Error ? e.message : String(e),
undefined,
_reqId,
); );
} }
log.info("ws mesh_execute", { log.info("ws mesh_execute", {
@@ -1628,12 +1735,14 @@ function handleConnection(ws: WebSocket): void {
case "mesh_schema": { case "mesh_schema": {
try { try {
const tables = await meshSchema(conn.meshId); const tables = await meshSchema(conn.meshId);
sendToPeer(presenceId, { type: "mesh_schema_result", tables }); sendToPeer(presenceId, { type: "mesh_schema_result", tables, ...(_reqId ? { _reqId } : {}) });
} catch (e) { } catch (e) {
sendError( sendError(
conn.ws, conn.ws,
"schema_error", "schema_error",
e instanceof Error ? e.message : String(e), e instanceof Error ? e.message : String(e),
undefined,
_reqId,
); );
} }
log.info("ws mesh_schema", { presence_id: presenceId }); log.info("ws mesh_schema", { presence_id: presenceId });
@@ -1656,7 +1765,6 @@ function handleConnection(ws: WebSocket): void {
]); ]);
const allGroups = new Set<string>(); const allGroups = new Set<string>();
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`); for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
const peerConn = connections.get(presenceId); const peerConn = connections.get(presenceId);
// Find own display name: match sessionPubkey from the peer list // Find own display name: match sessionPubkey from the peer list
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey; const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;
@@ -1675,6 +1783,7 @@ function handleConnection(ws: WebSocket): void {
collections: [], collections: [],
yourName: selfPeer?.displayName ?? "unknown", yourName: selfPeer?.displayName ?? "unknown",
yourGroups: peerConn?.groups ?? [], yourGroups: peerConn?.groups ?? [],
...(_reqId ? { _reqId } : {}),
}); });
log.info("ws mesh_info", { presence_id: presenceId }); log.info("ws mesh_info", { presence_id: presenceId });
break; break;

View File

@@ -161,6 +161,7 @@ export interface WSAckMessage {
id: string; // echoes client-side correlation id id: string; // echoes client-side correlation id
messageId: string; messageId: string;
queued: boolean; queued: boolean;
_reqId?: string;
} }
/** Broker → client: hello handshake acknowledgement. */ /** Broker → client: hello handshake acknowledgement. */
@@ -182,6 +183,7 @@ export interface WSPeersListMessage {
sessionId: string; sessionId: string;
connectedAt: string; connectedAt: string;
}>; }>;
_reqId?: string;
} }
/** Broker → client: a state key was changed by another peer. */ /** Broker → client: a state key was changed by another peer. */
@@ -199,6 +201,7 @@ export interface WSStateResultMessage {
value: unknown; value: unknown;
updatedAt: string; updatedAt: string;
updatedBy: string; updatedBy: string;
_reqId?: string;
} }
/** Broker → client: response to list_state. */ /** Broker → client: response to list_state. */
@@ -210,12 +213,14 @@ export interface WSStateListMessage {
updatedBy: string; updatedBy: string;
updatedAt: string; updatedAt: string;
}>; }>;
_reqId?: string;
} }
/** Broker → client: acknowledgement for a remember. */ /** Broker → client: acknowledgement for a remember. */
export interface WSMemoryStoredMessage { export interface WSMemoryStoredMessage {
type: "memory_stored"; type: "memory_stored";
id: string; id: string;
_reqId?: string;
} }
/** Broker → client: response to recall. */ /** Broker → client: response to recall. */
@@ -228,6 +233,7 @@ export interface WSMemoryResultsMessage {
rememberedBy: string; rememberedBy: string;
rememberedAt: string; rememberedAt: string;
}>; }>;
_reqId?: string;
} }
// --- Vector storage messages --- // --- Vector storage messages ---
@@ -295,6 +301,13 @@ export interface WSMeshSchemaMessage {
// --- Vector/Graph response messages --- // --- Vector/Graph response messages ---
/** Broker → client: confirmation that a vector point was stored. */
export interface WSVectorStoredMessage {
type: "vector_stored";
id: string;
_reqId?: string;
}
/** Broker → client: vector search results. */ /** Broker → client: vector search results. */
export interface WSVectorResultsMessage { export interface WSVectorResultsMessage {
type: "vector_results"; type: "vector_results";
@@ -304,18 +317,21 @@ export interface WSVectorResultsMessage {
score: number; score: number;
metadata?: Record<string, unknown>; metadata?: Record<string, unknown>;
}>; }>;
_reqId?: string;
} }
/** Broker → client: list of vector collections. */ /** Broker → client: list of vector collections. */
export interface WSCollectionListMessage { export interface WSCollectionListMessage {
type: "collection_list"; type: "collection_list";
collections: string[]; collections: string[];
_reqId?: string;
} }
/** Broker → client: graph query results. */ /** Broker → client: graph query results. */
export interface WSGraphResultMessage { export interface WSGraphResultMessage {
type: "graph_result"; type: "graph_result";
records: Array<Record<string, unknown>>; records: Array<Record<string, unknown>>;
_reqId?: string;
} }
/** Broker → client: mesh SQL query results. */ /** Broker → client: mesh SQL query results. */
@@ -324,6 +340,7 @@ export interface WSMeshQueryResultMessage {
columns: string[]; columns: string[];
rows: Array<Record<string, unknown>>; rows: Array<Record<string, unknown>>;
rowCount: number; rowCount: number;
_reqId?: string;
} }
/** Broker → client: mesh schema introspection results. */ /** Broker → client: mesh schema introspection results. */
@@ -333,6 +350,7 @@ export interface WSMeshSchemaResultMessage {
name: string; name: string;
columns: Array<{ name: string; type: string; nullable: boolean }>; columns: Array<{ name: string; type: string; nullable: boolean }>;
}>; }>;
_reqId?: string;
} }
/** Client → broker: get full mesh overview. */ /** Client → broker: get full mesh overview. */
@@ -355,6 +373,7 @@ export interface WSMeshInfoResultMessage {
collections: string[]; collections: string[];
yourName: string; yourName: string;
yourGroups: Array<{ name: string; role?: string }>; yourGroups: Array<{ name: string; role?: string }>;
_reqId?: string;
} }
/** Client → broker: check delivery status of a message. */ /** Client → broker: check delivery status of a message. */
@@ -375,6 +394,7 @@ export interface WSMessageStatusResultMessage {
pubkey: string; pubkey: string;
status: "delivered" | "held" | "disconnected"; status: "delivered" | "held" | "disconnected";
}>; }>;
_reqId?: string;
} }
// --- File sharing messages --- // --- File sharing messages ---
@@ -404,12 +424,23 @@ export interface WSDeleteFileMessage {
fileId: string; fileId: string;
} }
/** Client → broker: grant a peer access to an encrypted file. */
export interface WSGrantFileAccessMessage {
type: "grant_file_access";
fileId: string;
peerPubkey: string;
sealedKey: string;
}
/** Broker → client: presigned URL for downloading a file. */ /** Broker → client: presigned URL for downloading a file. */
export interface WSFileUrlMessage { export interface WSFileUrlMessage {
type: "file_url"; type: "file_url";
fileId: string; fileId: string;
url: string; url: string;
name: string; name: string;
encrypted?: boolean;
sealedKey?: string;
_reqId?: string;
} }
/** Broker → client: list of files in the mesh. */ /** Broker → client: list of files in the mesh. */
@@ -423,7 +454,17 @@ export interface WSFileListMessage {
uploadedBy: string; uploadedBy: string;
uploadedAt: string; uploadedAt: string;
persistent: boolean; persistent: boolean;
encrypted: boolean;
}>; }>;
_reqId?: string;
}
/** Broker → client: acknowledgement for grant_file_access. */
export interface WSGrantFileAccessOkMessage {
type: "grant_file_access_ok";
fileId: string;
peerPubkey: string;
_reqId?: string;
} }
/** Broker → client: access log for a file. */ /** Broker → client: access log for a file. */
@@ -434,6 +475,7 @@ export interface WSFileStatusResultMessage {
peerName: string; peerName: string;
accessedAt: string; accessedAt: string;
}>; }>;
_reqId?: string;
} }
// --- Context sharing messages --- // --- Context sharing messages ---
@@ -475,6 +517,7 @@ export interface WSContextResultsMessage {
tags: string[]; tags: string[];
updatedAt: string; updatedAt: string;
}>; }>;
_reqId?: string;
} }
/** Broker → client: response to list_contexts. */ /** Broker → client: response to list_contexts. */
@@ -486,6 +529,7 @@ export interface WSContextListMessage {
tags: string[]; tags: string[];
updatedAt: string; updatedAt: string;
}>; }>;
_reqId?: string;
} }
// --- Task messages --- // --- Task messages ---
@@ -523,6 +567,7 @@ export interface WSListTasksMessage {
export interface WSTaskCreatedMessage { export interface WSTaskCreatedMessage {
type: "task_created"; type: "task_created";
id: string; id: string;
_reqId?: string;
} }
/** Broker → client: response to list_tasks, claim_task, complete_task. */ /** Broker → client: response to list_tasks, claim_task, complete_task. */
@@ -539,6 +584,7 @@ export interface WSTaskListMessage {
tags: string[]; tags: string[];
createdAt: string; createdAt: string;
}>; }>;
_reqId?: string;
} }
// --- Stream messages --- // --- Stream messages ---
@@ -578,6 +624,7 @@ export interface WSStreamCreatedMessage {
type: "stream_created"; type: "stream_created";
id: string; id: string;
name: string; name: string;
_reqId?: string;
} }
/** Broker → client: real-time data pushed from a stream. */ /** Broker → client: real-time data pushed from a stream. */
@@ -588,6 +635,13 @@ export interface WSStreamDataMessage {
publishedBy: string; publishedBy: string;
} }
/** Broker → client: confirmation that a stream subscription was registered. */
export interface WSSubscribedMessage {
type: "subscribed";
stream: string;
_reqId?: string;
}
/** Broker → client: response to list_streams. */ /** Broker → client: response to list_streams. */
export interface WSStreamListMessage { export interface WSStreamListMessage {
type: "stream_list"; type: "stream_list";
@@ -598,6 +652,7 @@ export interface WSStreamListMessage {
createdAt: string; createdAt: string;
subscriberCount: number; subscriberCount: number;
}>; }>;
_reqId?: string;
} }
/** Broker → client: structured error. */ /** Broker → client: structured error. */
@@ -606,6 +661,7 @@ export interface WSErrorMessage {
code: string; code: string;
message: string; message: string;
id?: string; id?: string;
_reqId?: string;
} }
export type WSClientMessage = export type WSClientMessage =
@@ -627,6 +683,7 @@ export type WSClientMessage =
| WSListFilesMessage | WSListFilesMessage
| WSFileStatusMessage | WSFileStatusMessage
| WSDeleteFileMessage | WSDeleteFileMessage
| WSGrantFileAccessMessage
| WSShareContextMessage | WSShareContextMessage
| WSGetContextMessage | WSGetContextMessage
| WSListContextsMessage | WSListContextsMessage
@@ -664,11 +721,13 @@ export type WSServerMessage =
| WSFileUrlMessage | WSFileUrlMessage
| WSFileListMessage | WSFileListMessage
| WSFileStatusResultMessage | WSFileStatusResultMessage
| WSGrantFileAccessOkMessage
| WSContextSharedMessage | WSContextSharedMessage
| WSContextResultsMessage | WSContextResultsMessage
| WSContextListMessage | WSContextListMessage
| WSTaskCreatedMessage | WSTaskCreatedMessage
| WSTaskListMessage | WSTaskListMessage
| WSVectorStoredMessage
| WSVectorResultsMessage | WSVectorResultsMessage
| WSCollectionListMessage | WSCollectionListMessage
| WSGraphResultMessage | WSGraphResultMessage
@@ -676,6 +735,7 @@ export type WSServerMessage =
| WSMeshSchemaResultMessage | WSMeshSchemaResultMessage
| WSStreamCreatedMessage | WSStreamCreatedMessage
| WSStreamDataMessage | WSStreamDataMessage
| WSSubscribedMessage
| WSStreamListMessage | WSStreamListMessage
| WSMeshInfoResultMessage | WSMeshInfoResultMessage
| WSErrorMessage; | WSErrorMessage;

View File

@@ -1,6 +1,6 @@
{ {
"name": "claudemesh-cli", "name": "claudemesh-cli",
"version": "0.5.9", "version": "0.6.6",
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.", "description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
"keywords": [ "keywords": [
"claude-code", "claude-code",
@@ -47,6 +47,7 @@
}, },
"dependencies": { "dependencies": {
"@modelcontextprotocol/sdk": "1.27.1", "@modelcontextprotocol/sdk": "1.27.1",
"citty": "0.2.2",
"libsodium-wrappers": "0.7.15", "libsodium-wrappers": "0.7.15",
"ws": "8.20.0", "ws": "8.20.0",
"zod": "4.1.13" "zod": "4.1.13"

View File

@@ -1,9 +1,12 @@
/** /**
* `claudemesh launch` — spawn `claude` with peer mesh identity. * `claudemesh launch` — spawn `claude` with peer mesh identity.
* *
* Flags are defined in index.ts (citty command) — that is the source of
* truth. This file receives already-parsed flags and rawArgs.
*
* Flow: * Flow:
* 1. Parse --name, --join, --mesh, --quiet flags * 1. Receive parsed flags from citty + rawArgs for -- passthrough
* 2. If --join: run join flow first (accepts token or URL) * 2. If --join: run join flow first
* 3. Load config → pick mesh (auto if 1, interactive picker if >1) * 3. Load config → pick mesh (auto if 1, interactive picker if >1)
* 4. Write per-session config to tmpdir (isolates mesh selection) * 4. Write per-session config to tmpdir (isolates mesh selection)
* 5. Spawn claude with CLAUDEMESH_CONFIG_DIR + CLAUDEMESH_DISPLAY_NAME * 5. Spawn claude with CLAUDEMESH_CONFIG_DIR + CLAUDEMESH_DISPLAY_NAME
@@ -18,73 +21,17 @@ import { createInterface } from "node:readline";
import { loadConfig, getConfigPath } from "../state/config"; import { loadConfig, getConfigPath } from "../state/config";
import type { Config, JoinedMesh, GroupEntry } from "../state/config"; import type { Config, JoinedMesh, GroupEntry } from "../state/config";
// --- Arg parsing --- // Flags as parsed by citty (index.ts is the source of truth for definitions).
export interface LaunchFlags {
interface LaunchArgs { name?: string;
name: string | null; role?: string;
role: string | null; groups?: string;
groups: string | null; // comma-separated, e.g. "frontend:lead,reviewers:member" join?: string;
joinLink: string | null; mesh?: string;
meshSlug: string | null; "message-mode"?: string;
messageMode: "push" | "inbox" | "off" | null; "system-prompt"?: string;
quiet: boolean; yes?: boolean;
skipPermConfirm: boolean; quiet?: boolean;
claudeArgs: string[];
}
function parseArgs(argv: string[]): LaunchArgs {
const result: LaunchArgs = {
name: null,
role: null,
groups: null,
joinLink: null,
meshSlug: null,
messageMode: null,
quiet: false,
skipPermConfirm: false,
claudeArgs: [],
};
let i = 0;
while (i < argv.length) {
const arg = argv[i]!;
if (arg === "--name" && i + 1 < argv.length) {
result.name = argv[++i]!;
} else if (arg.startsWith("--name=")) {
result.name = arg.slice("--name=".length);
} else if (arg === "--role" && i + 1 < argv.length) {
result.role = argv[++i]!;
} else if (arg.startsWith("--role=")) {
result.role = arg.slice("--role=".length);
} else if (arg === "--groups" && i + 1 < argv.length) {
result.groups = argv[++i]!;
} else if (arg.startsWith("--groups=")) {
result.groups = arg.slice("--groups=".length);
} else if (arg === "--join" && i + 1 < argv.length) {
result.joinLink = argv[++i]!;
} else if (arg.startsWith("--join=")) {
result.joinLink = arg.slice("--join=".length);
} else if (arg === "--mesh" && i + 1 < argv.length) {
result.meshSlug = argv[++i]!;
} else if (arg.startsWith("--mesh=")) {
result.meshSlug = arg.slice("--mesh=".length);
} else if (arg === "--inbox") {
result.messageMode = "inbox";
} else if (arg === "--no-messages") {
result.messageMode = "off";
} else if (arg === "--quiet") {
result.quiet = true;
} else if (arg === "-y" || arg === "--yes") {
result.skipPermConfirm = true;
} else if (arg === "--") {
result.claudeArgs.push(...argv.slice(i + 1));
break;
} else {
result.claudeArgs.push(arg);
}
i++;
}
return result;
} }
// --- Interactive mesh picker --- // --- Interactive mesh picker ---
@@ -206,8 +153,26 @@ function printBanner(name: string, meshSlug: string, role: string | null, groups
// --- Main --- // --- Main ---
export async function runLaunch(extraArgs: string[]): Promise<void> { export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise<void> {
const args = parseArgs(extraArgs); // Extract args that follow "--" — passed straight through to claude.
const dashIdx = rawArgs.indexOf("--");
const claudePassthrough = dashIdx >= 0 ? rawArgs.slice(dashIdx + 1) : [];
// Normalise flags into the internal shape used below.
const args = {
name: flags.name ?? null,
role: flags.role ?? null,
groups: flags.groups ?? null,
joinLink: flags.join ?? null,
meshSlug: flags.mesh ?? null,
messageMode: (["push", "inbox", "off"].includes(flags["message-mode"] ?? "")
? flags["message-mode"] as "push" | "inbox" | "off"
: null),
systemPrompt: flags["system-prompt"] ?? null,
quiet: flags.quiet ?? false,
skipPermConfirm: flags.yes ?? false,
claudeArgs: claudePassthrough,
};
// 1. If --join, run join flow first. // 1. If --join, run join flow first.
if (args.joinLink) { if (args.joinLink) {
@@ -318,6 +283,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
version: 1, version: 1,
meshes: [mesh], meshes: [mesh],
displayName, displayName,
...(role ? { role } : {}),
...(parsedGroups.length > 0 ? { groups: parsedGroups } : {}), ...(parsedGroups.length > 0 ? { groups: parsedGroups } : {}),
messageMode, messageMode,
}; };
@@ -351,6 +317,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
"--dangerously-load-development-channels", "--dangerously-load-development-channels",
"server:claudemesh", "server:claudemesh",
"--dangerously-skip-permissions", "--dangerously-skip-permissions",
...(args.systemPrompt ? ["--system-prompt", args.systemPrompt] : []),
...filtered, ...filtered,
]; ];
@@ -362,6 +329,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
...process.env, ...process.env,
CLAUDEMESH_CONFIG_DIR: tmpDir, CLAUDEMESH_CONFIG_DIR: tmpDir,
CLAUDEMESH_DISPLAY_NAME: displayName, CLAUDEMESH_DISPLAY_NAME: displayName,
...(role ? { CLAUDEMESH_ROLE: role } : {}),
}, },
}); });

View File

@@ -0,0 +1,90 @@
/**
* File encryption for claudemesh E2E file sharing.
*
* Symmetric: crypto_secretbox_easy with random Kf (32-byte key).
* Key wrapping: crypto_box_seal to recipient's X25519 pub (converted from ed25519).
* Key opening: crypto_box_seal_open with own X25519 keypair.
*/
import { ensureSodium } from "./keypair";
export interface EncryptedFile {
ciphertext: Uint8Array; // secretbox ciphertext (includes MAC)
nonce: string; // base64 24-byte nonce
key: Uint8Array; // 32-byte symmetric Kf (keep in memory only)
}
/**
* Encrypt file bytes with a fresh random symmetric key.
* Returns ciphertext, nonce (base64), and the plaintext Kf.
*/
export async function encryptFile(plaintext: Uint8Array): Promise<EncryptedFile> {
const sodium = await ensureSodium();
const key = sodium.randombytes_buf(sodium.crypto_secretbox_KEYBYTES);
const nonce = sodium.randombytes_buf(sodium.crypto_secretbox_NONCEBYTES);
const ciphertext = sodium.crypto_secretbox_easy(plaintext, nonce, key);
return {
ciphertext,
nonce: sodium.to_base64(nonce, sodium.base64_variants.ORIGINAL),
key,
};
}
/**
* Decrypt file bytes with the symmetric key Kf.
* Returns null if decryption fails.
*/
export async function decryptFile(
ciphertext: Uint8Array,
nonceB64: string,
key: Uint8Array,
): Promise<Uint8Array | null> {
const sodium = await ensureSodium();
try {
const nonce = sodium.from_base64(nonceB64, sodium.base64_variants.ORIGINAL);
return sodium.crypto_secretbox_open_easy(ciphertext, nonce, key);
} catch {
return null;
}
}
/**
* Seal Kf for a recipient using crypto_box_seal (ephemeral sender key).
* recipientPubkeyHex: ed25519 pubkey of recipient (64 hex chars).
* Returns base64 sealed box.
*/
export async function sealKeyForPeer(
kf: Uint8Array,
recipientPubkeyHex: string,
): Promise<string> {
const sodium = await ensureSodium();
const recipientCurve = sodium.crypto_sign_ed25519_pk_to_curve25519(
sodium.from_hex(recipientPubkeyHex),
);
const sealed = sodium.crypto_box_seal(kf, recipientCurve);
return sodium.to_base64(sealed, sodium.base64_variants.ORIGINAL);
}
/**
* Open a sealed key blob using own ed25519 keypair (converted to X25519).
* Returns the 32-byte Kf or null if decryption fails.
*/
export async function openSealedKey(
sealedB64: string,
myPubkeyHex: string,
mySecretKeyHex: string,
): Promise<Uint8Array | null> {
const sodium = await ensureSodium();
try {
const myCurvePub = sodium.crypto_sign_ed25519_pk_to_curve25519(
sodium.from_hex(myPubkeyHex),
);
const myCurveSec = sodium.crypto_sign_ed25519_sk_to_curve25519(
sodium.from_hex(mySecretKeyHex),
);
const sealed = sodium.from_base64(sealedB64, sodium.base64_variants.ORIGINAL);
return sodium.crypto_box_seal_open(sealed, myCurvePub, myCurveSec);
} catch {
return null;
}
}

View File

@@ -1,13 +1,15 @@
/** /**
* claudemesh-cli entry point. * claudemesh-cli entry point.
* *
* Uses citty to define commands and flags. --help is generated from
* the command definitions — the flag list here IS the documentation.
*
* Dispatches between two modes: * Dispatches between two modes:
* - `claudemesh mcp` → MCP server (stdio transport) * - `claudemesh mcp` → MCP server (stdio transport)
* - `claudemesh <subcommand>` → CLI subcommand * - `claudemesh <subcommand>` → CLI subcommand
*
* Claude Code invokes the `mcp` mode via stdio. Humans use all others.
*/ */
import { defineCommand, runMain } from "citty";
import { startMcpServer } from "./mcp/server"; import { startMcpServer } from "./mcp/server";
import { runInstall, runUninstall } from "./commands/install"; import { runInstall, runUninstall } from "./commands/install";
import { runJoin } from "./commands/join"; import { runJoin } from "./commands/join";
@@ -21,96 +23,152 @@ import { runDoctor } from "./commands/doctor";
import { runWelcome } from "./commands/welcome"; import { runWelcome } from "./commands/welcome";
import { VERSION } from "./version"; import { VERSION } from "./version";
const HELP = `claudemesh v${VERSION} — peer mesh for Claude Code sessions const launch = defineCommand({
meta: {
Usage: name: "launch",
claudemesh <command> [args] description: "Launch Claude Code connected to a mesh with real-time peer messaging",
},
Commands: args: {
install Register MCP + Stop/UserPromptSubmit status hooks name: {
(add --no-hooks for bare MCP registration) type: "string",
uninstall Remove MCP server + hooks description: "Display name for this session",
launch [opts] Launch Claude Code with real-time push messages },
--name <name> Display name for this session role: {
--mesh <slug> Select mesh (picker if >1, omitted) type: "string",
--join <url> Join a mesh before launching description: "Role tag (dev, lead, analyst — free-form)",
--quiet Skip the info banner },
-- <args> Pass remaining args to claude groups: {
join <url> Join a mesh via https://claudemesh.com/join/... URL type: "string",
list Show all joined meshes description: 'Groups to join: "group:role,group2" — colon sets role. Hierarchy via slash: "eng/frontend:lead"',
leave <slug> Leave a joined mesh },
status Health report: broker reachability per joined mesh mesh: {
doctor Diagnostic checks (install, config, keypairs, PATH) type: "string",
seed-test-mesh Dev-only: inject a mesh into config (skips invite flow) description: "Select mesh by slug (interactive picker if omitted and >1 joined)",
mcp Start MCP server (stdio) — invoked by Claude Code },
--help, -h Show this help join: {
--version, -v Show the CLI version type: "string",
description: "Join a mesh via invite URL before launching",
Environment: },
CLAUDEMESH_BROKER_URL Override broker URL (default: wss://ic.claudemesh.com/ws) "message-mode": {
CLAUDEMESH_CONFIG_DIR Override config directory (default: ~/.claudemesh/) type: "string",
CLAUDEMESH_DEBUG=1 Verbose logging description: "push (default) | inbox | off — controls how peer messages are delivered",
`; },
"system-prompt": {
const cmd = process.argv[2]; type: "string",
const args = process.argv.slice(3); description: "Set Claude's system prompt for this session",
},
async function main(): Promise<void> { yes: {
switch (cmd) { type: "boolean",
case "mcp": alias: "y",
await startMcpServer(); description: "Skip permission confirmation",
return; default: false,
case "install": },
runInstall(args); quiet: {
return; type: "boolean",
case "uninstall": description: "Skip banner and all interactive prompts",
runUninstall(); default: false,
return; },
case "hook": },
await runHook(args); run({ args, rawArgs }) {
return; // Forward to the existing launch runner, preserving -- passthrough to claude.
case "launch": return runLaunch(args, rawArgs);
await runLaunch(args); },
return;
case "join":
await runJoin(args);
return;
case "list":
runList();
return;
case "leave":
runLeave(args);
return;
case "status":
await runStatus();
return;
case "doctor":
await runDoctor();
return;
case "seed-test-mesh":
runSeedTestMesh(args);
return;
case "--version":
case "-v":
case "version":
console.log(VERSION);
return;
case "--help":
case "-h":
case "help":
console.log(HELP);
return;
case undefined:
runWelcome();
return;
default:
console.error(`Unknown command: ${cmd}`);
console.error("Run `claudemesh --help` for usage.");
process.exit(1);
}
}
main().catch((e) => {
console.error(`claudemesh: ${e instanceof Error ? e.message : String(e)}`);
process.exit(1);
}); });
const install = defineCommand({
meta: {
name: "install",
description: "Register MCP server + status hooks with Claude Code",
},
args: {
"no-hooks": {
type: "boolean",
description: "Register MCP server only, skip hooks",
default: false,
},
},
run({ rawArgs }) {
runInstall(rawArgs);
},
});
const join = defineCommand({
meta: {
name: "join",
description: "Join a mesh via invite URL",
},
args: {
url: {
type: "positional",
description: "Invite URL (https://claudemesh.com/join/...)",
required: true,
},
},
run({ args }) {
return runJoin([args.url]);
},
});
const leave = defineCommand({
meta: {
name: "leave",
description: "Leave a joined mesh",
},
args: {
slug: {
type: "positional",
description: "Mesh slug to leave",
required: true,
},
},
run({ args }) {
runLeave([args.slug]);
},
});
const main = defineCommand({
meta: {
name: "claudemesh",
version: VERSION,
description: "Peer mesh for Claude Code sessions",
},
subCommands: {
launch,
install,
uninstall: defineCommand({
meta: { name: "uninstall", description: "Remove MCP server and hooks" },
run() { runUninstall(); },
}),
join,
list: defineCommand({
meta: { name: "list", description: "Show joined meshes and identities" },
run() { runList(); },
}),
leave,
status: defineCommand({
meta: { name: "status", description: "Check broker reachability for each joined mesh" },
async run() { await runStatus(); },
}),
doctor: defineCommand({
meta: { name: "doctor", description: "Diagnose install, config, keypairs, and PATH" },
async run() { await runDoctor(); },
}),
mcp: defineCommand({
meta: { name: "mcp", description: "Start MCP server (stdio — invoked by Claude Code, not users)" },
async run() { await startMcpServer(); },
}),
"seed-test-mesh": defineCommand({
meta: { name: "seed-test-mesh", description: "Dev only: inject a mesh into config (skips invite flow)" },
run({ rawArgs }) { runSeedTestMesh(rawArgs); },
}),
hook: defineCommand({
meta: { name: "hook", description: "Internal hook handler (invoked by Claude Code hooks)" },
async run({ rawArgs }) { await runHook(rawArgs); },
}),
},
run() {
runWelcome();
},
});
runMain(main);

View File

@@ -130,6 +130,7 @@ export async function startMcpServer(): Promise<void> {
const config = loadConfig(); const config = loadConfig();
const myName = config.displayName ?? "unnamed"; const myName = config.displayName ?? "unnamed";
const myRole = config.role ?? process.env.CLAUDEMESH_ROLE ?? null;
const myGroups = (config.groups ?? []).map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ') || "none"; const myGroups = (config.groups ?? []).map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ') || "none";
const messageMode = config.messageMode ?? "push"; const messageMode = config.messageMode ?? "push";
@@ -141,7 +142,7 @@ export async function startMcpServer(): Promise<void> {
tools: {}, tools: {},
}, },
instructions: `## Identity instructions: `## Identity
You are "${myName}" — a peer in the claudemesh network. Your groups: ${myGroups}. You are one of several Claude Code sessions connected to the same mesh. No orchestrator exists — peers are equals. Your identity comes from your name and group roles, not from a central authority. You are "${myName}"${myRole ? ` (${myRole})` : ""} — a peer in the claudemesh network. Your groups: ${myGroups}. You are one of several Claude Code sessions connected to the same mesh. No orchestrator exists — peers are equals. Your identity comes from your name and group roles, not from a central authority.
## Responding to messages ## Responding to messages
When you receive a <channel source="claudemesh" ...> message, RESPOND IMMEDIATELY. Pause your current task, reply via send_message, then resume. Read from_name, mesh_slug, and priority from the channel attributes. Reply by setting \`to\` to the sender's from_name (display name). Stay in character per your system prompt. Do not ignore low-priority messages — acknowledge them briefly even if you defer action. When you receive a <channel source="claudemesh" ...> message, RESPOND IMMEDIATELY. Pause your current task, reply via send_message, then resume. Read from_name, mesh_slug, and priority from the channel attributes. Reply by setting \`to\` to the sender's from_name (display name). Stay in character per your system prompt. Do not ignore low-priority messages — acknowledge them briefly even if you defer action.
@@ -326,9 +327,15 @@ Your message mode is "${messageMode}".
case "message_status": { case "message_status": {
const { id } = (args ?? {}) as { id?: string }; const { id } = (args ?? {}) as { id?: string };
if (!id) return text("message_status: `id` required", true); if (!id) return text("message_status: `id` required", true);
const client = allClients()[0]; const clients = allClients();
if (!client) return text("message_status: not connected", true); if (!clients.length) return text("message_status: not connected", true);
const result = await client.messageStatus(id); // Try each connected mesh client — we don't know which mesh the
// messageId belongs to, so query all and return the first hit.
let result = null;
for (const c of clients) {
result = await c.messageStatus(id);
if (result) break;
}
if (!result) return text(`Message ${id} not found or timed out.`); if (!result) return text(`Message ${id} not found or timed out.`);
const recipientLines = result.recipients.map( const recipientLines = result.recipients.map(
(r: { name: string; pubkey: string; status: string }) => (r: { name: string; pubkey: string; status: string }) =>
@@ -439,12 +446,75 @@ Your message mode is "${messageMode}".
// --- Files --- // --- Files ---
case "share_file": { case "share_file": {
const { path: filePath, name: fileName, tags } = (args ?? {}) as { path?: string; name?: string; tags?: string[] }; const { path: filePath, name: fileName, tags, to: fileTo } = (args ?? {}) as { path?: string; name?: string; tags?: string[]; to?: string };
if (!filePath) return text("share_file: `path` required", true); if (!filePath) return text("share_file: `path` required", true);
const { existsSync } = await import("node:fs"); const { existsSync } = await import("node:fs");
if (!existsSync(filePath)) return text(`share_file: file not found: ${filePath}`, true); if (!existsSync(filePath)) return text(`share_file: file not found: ${filePath}`, true);
const client = allClients()[0]; const client = allClients()[0];
if (!client) return text("share_file: not connected", true); if (!client) return text("share_file: not connected", true);
// If 'to' specified, do E2E encryption
if (fileTo) {
const { encryptFile, sealKeyForPeer } = await import("../crypto/file-crypto");
const { readFileSync, writeFileSync, mkdtempSync, unlinkSync, rmdirSync } = await import("node:fs");
const { tmpdir } = await import("node:os");
const { join, basename } = await import("node:path");
// Resolve target peer pubkey
const peers = await client.listPeers();
const targetPeer = peers.find(p => p.pubkey === fileTo || p.displayName === fileTo);
if (!targetPeer) {
return text(`share_file: peer not found: ${fileTo}`, true);
}
// Read and encrypt file
const plaintext = readFileSync(filePath);
const { ciphertext, nonce, key } = await encryptFile(new Uint8Array(plaintext));
// Seal Kf for target peer
const sealedForTarget = await sealKeyForPeer(key, targetPeer.pubkey);
// Seal Kf for ourselves (owner)
const myPubkey = client.getSessionPubkey();
const sealedForSelf = myPubkey ? await sealKeyForPeer(key, myPubkey) : null;
const fileKeys = [
{ peerPubkey: targetPeer.pubkey, sealedKey: sealedForTarget },
...(sealedForSelf && myPubkey ? [{ peerPubkey: myPubkey, sealedKey: sealedForSelf }] : []),
];
// Build combined buffer: nonce (24 bytes) + ciphertext
const { ensureSodium } = await import("../crypto/keypair");
const sodium = await ensureSodium();
const nonceBytes = sodium.from_base64(nonce, sodium.base64_variants.ORIGINAL);
const combined = new Uint8Array(nonceBytes.length + ciphertext.length);
combined.set(nonceBytes, 0);
combined.set(ciphertext, nonceBytes.length);
const baseName = fileName ?? basename(filePath);
const tmpDir = mkdtempSync(join(tmpdir(), "cm-"));
const tmpPath = join(tmpDir, baseName);
writeFileSync(tmpPath, combined);
try {
const fileId = await client.uploadFile(tmpPath, client.meshId, client.meshSlug, {
name: baseName,
tags,
persistent: true,
encrypted: true,
ownerPubkey: myPubkey ?? undefined,
fileKeys,
});
return text(`Shared (E2E encrypted): ${baseName}${targetPeer.displayName} (${fileId})`);
} catch (e) {
return text(`share_file: upload failed — ${e instanceof Error ? e.message : String(e)}`, true);
} finally {
try { unlinkSync(tmpPath); } catch { /* ignore */ }
try { rmdirSync(tmpDir); } catch { /* ignore */ }
}
}
// Plain (unencrypted) upload — existing code
try { try {
const fileId = await client.uploadFile(filePath, client.meshId, client.meshSlug, { const fileId = await client.uploadFile(filePath, client.meshId, client.meshSlug, {
name: fileName, tags, persistent: true, name: fileName, tags, persistent: true,
@@ -462,6 +532,43 @@ Your message mode is "${messageMode}".
if (!client) return text("get_file: not connected", true); if (!client) return text("get_file: not connected", true);
const result = await client.getFile(id); const result = await client.getFile(id);
if (!result) return text(`get_file: file ${id} not found`, true); if (!result) return text(`get_file: file ${id} not found`, true);
if (result.encrypted) {
if (!result.sealedKey) return text("get_file: encrypted file — no decryption key available for your session", true);
const { openSealedKey, decryptFile } = await import("../crypto/file-crypto");
const { ensureSodium } = await import("../crypto/keypair");
const myPubkey = client.getSessionPubkey();
const mySecret = client.getSessionSecretKey();
if (!myPubkey || !mySecret) {
return text("get_file: no session keypair — cannot decrypt", true);
}
const kf = await openSealedKey(result.sealedKey, myPubkey, mySecret);
if (!kf) return text("get_file: failed to open sealed key", true);
// Download file bytes from presigned URL
const resp = await fetch(result.url, { signal: AbortSignal.timeout(30_000) });
if (!resp.ok) return text(`get_file: download failed (${resp.status})`, true);
const buf = new Uint8Array(await resp.arrayBuffer());
// Wire format: first 24 bytes = nonce, rest = ciphertext
const sodium = await ensureSodium();
const NONCE_BYTES = sodium.crypto_secretbox_NONCEBYTES; // 24
const nonce = sodium.to_base64(buf.slice(0, NONCE_BYTES), sodium.base64_variants.ORIGINAL);
const ciphertext = buf.slice(NONCE_BYTES);
const plaintext = await decryptFile(ciphertext, nonce, kf);
if (!plaintext) return text("get_file: decryption failed", true);
const { writeFileSync, mkdirSync } = await import("node:fs");
const { dirname } = await import("node:path");
mkdirSync(dirname(save_to), { recursive: true });
writeFileSync(save_to, plaintext);
return text(`Downloaded and decrypted: ${result.name}${save_to}`);
}
// Unencrypted — existing download logic
const res = await fetch(result.url, { signal: AbortSignal.timeout(30_000) }); const res = await fetch(result.url, { signal: AbortSignal.timeout(30_000) });
if (!res.ok) return text(`get_file: download failed (${res.status})`, true); if (!res.ok) return text(`get_file: download failed (${res.status})`, true);
const { writeFileSync, mkdirSync } = await import("node:fs"); const { writeFileSync, mkdirSync } = await import("node:fs");
@@ -760,6 +867,36 @@ Your message mode is "${messageMode}".
return text(results.join("\n")); return text(results.join("\n"));
} }
case "grant_file_access": {
const { fileId, to: grantTo } = (args ?? {}) as { fileId?: string; to?: string };
if (!fileId || !grantTo) return text("grant_file_access: `fileId` and `to` required", true);
const client = allClients()[0];
if (!client) return text("grant_file_access: not connected", true);
const peers = await client.listPeers();
const targetPeer = peers.find(p => p.pubkey === grantTo || p.displayName === grantTo);
if (!targetPeer) return text(`grant_file_access: peer not found: ${grantTo}`, true);
const result = await client.getFile(fileId);
if (!result) return text("grant_file_access: file not found", true);
if (!result.encrypted) return text("grant_file_access: file is not encrypted", true);
if (!result.sealedKey) return text("grant_file_access: no key available (are you the owner?)", true);
const { openSealedKey, sealKeyForPeer } = await import("../crypto/file-crypto");
const myPubkey = client.getSessionPubkey();
const mySecret = client.getSessionSecretKey();
if (!myPubkey || !mySecret) return text("grant_file_access: no session keypair", true);
const kf = await openSealedKey(result.sealedKey, myPubkey, mySecret);
if (!kf) return text("grant_file_access: cannot decrypt your own key", true);
const sealedForPeer = await sealKeyForPeer(kf, targetPeer.pubkey);
const ok = await client.grantFileAccess(fileId, targetPeer.pubkey, sealedForPeer);
if (!ok) return text("grant_file_access: broker did not confirm", true);
return text(`Access granted: ${targetPeer.displayName} can now download file ${fileId}`);
}
default: default:
return text(`Unknown tool: ${name}`, true); return text(`Unknown tool: ${name}`, true);
} }

View File

@@ -203,7 +203,7 @@ export const TOOLS: Tool[] = [
{ {
name: "share_file", name: "share_file",
description: description:
"Share a persistent file with the mesh. All current and future peers can access it.", "Share a persistent file with the mesh. All current and future peers can access it. If `to` is specified, the file is E2E encrypted and only accessible to that peer (and you).",
inputSchema: { inputSchema: {
type: "object", type: "object",
properties: { properties: {
@@ -217,6 +217,10 @@ export const TOOLS: Tool[] = [
items: { type: "string" }, items: { type: "string" },
description: "Tags for categorization", description: "Tags for categorization",
}, },
to: {
type: "string",
description: "Peer display name or pubkey hex — if set, file is E2E encrypted for this peer only",
},
}, },
required: ["path"], required: ["path"],
}, },
@@ -269,6 +273,18 @@ export const TOOLS: Tool[] = [
required: ["id"], required: ["id"],
}, },
}, },
{
name: "grant_file_access",
description: "Grant a peer access to an E2E encrypted file you shared. You must be the owner.",
inputSchema: {
type: "object",
properties: {
fileId: { type: "string", description: "File ID" },
to: { type: "string", description: "Peer display name or pubkey hex to grant access to" },
},
required: ["fileId", "to"],
},
},
// --- Vector tools --- // --- Vector tools ---
{ {

View File

@@ -37,6 +37,7 @@ export interface Config {
version: 1; version: 1;
meshes: JoinedMesh[]; meshes: JoinedMesh[];
displayName?: string; // per-session override, written by `claudemesh launch --name` displayName?: string; // per-session override, written by `claudemesh launch --name`
role?: string; // per-session role tag (display + hello)
groups?: GroupEntry[]; groups?: GroupEntry[];
messageMode?: "push" | "inbox" | "off"; messageMode?: "push" | "inbox" | "off";
} }
@@ -54,7 +55,7 @@ export function loadConfig(): Config {
if (!parsed || !Array.isArray(parsed.meshes)) { if (!parsed || !Array.isArray(parsed.meshes)) {
return { version: 1, meshes: [] }; return { version: 1, meshes: [] };
} }
return { version: 1, meshes: parsed.meshes, displayName: parsed.displayName, groups: parsed.groups, messageMode: parsed.messageMode }; return { version: 1, meshes: parsed.meshes, displayName: parsed.displayName, role: parsed.role, groups: parsed.groups, messageMode: parsed.messageMode };
} catch (e) { } catch (e) {
throw new Error( throw new Error(
`Failed to load ${CONFIG_PATH}: ${e instanceof Error ? e.message : String(e)}`, `Failed to load ${CONFIG_PATH}: ${e instanceof Error ? e.message : String(e)}`,

View File

@@ -75,14 +75,15 @@ export class BrokerClient {
private outbound: Array<() => void> = []; // closures that send once ws is open private outbound: Array<() => void> = []; // closures that send once ws is open
private pushHandlers = new Set<PushHandler>(); private pushHandlers = new Set<PushHandler>();
private pushBuffer: InboundPush[] = []; private pushBuffer: InboundPush[] = [];
private listPeersResolvers: Array<(peers: PeerInfo[]) => void> = []; private listPeersResolvers = new Map<string, { resolve: (peers: PeerInfo[]) => void; timer: NodeJS.Timeout }>();
private stateResolvers: Array<(result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void> = []; private stateResolvers = new Map<string, { resolve: (result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void; timer: NodeJS.Timeout }>();
private stateListResolvers: Array<(entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void> = []; private stateListResolvers = new Map<string, { resolve: (entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers: Array<(id: string | null) => void> = []; private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers: Array<(memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void> = []; private memoryRecallResolvers = new Map<string, { resolve: (memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void; timer: NodeJS.Timeout }>();
private stateChangeHandlers = new Set<(change: { key: string; value: unknown; updatedBy: string }) => void>(); private stateChangeHandlers = new Set<(change: { key: string; value: unknown; updatedBy: string }) => void>();
private sessionPubkey: string | null = null; private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null; private sessionSecretKey: string | null = null;
private grantFileAccessResolvers = new Map<string, { resolve: (ok: boolean) => void; timer: NodeJS.Timeout }>();
private closed = false; private closed = false;
private reconnectAttempt = 0; private reconnectAttempt = 0;
private helloTimer: NodeJS.Timeout | null = null; private helloTimer: NodeJS.Timeout | null = null;
@@ -110,6 +111,15 @@ export class BrokerClient {
return this.pushBuffer; return this.pushBuffer;
} }
/** Session public key hex (null before first connection). */
getSessionPubkey(): string | null { return this.sessionPubkey; }
/** Session secret key hex (null before first connection). */
getSessionSecretKey(): string | null { return this.sessionSecretKey; }
private makeReqId(): string {
return Math.random().toString(36).slice(2) + Date.now().toString(36);
}
/** Open WS, send hello, resolve when hello_ack received. */ /** Open WS, send hello, resolve when hello_ack received. */
async connect(): Promise<void> { async connect(): Promise<void> {
if (this.closed) throw new Error("client is closed"); if (this.closed) throw new Error("client is closed");
@@ -299,16 +309,11 @@ export class BrokerClient {
async listPeers(): Promise<PeerInfo[]> { async listPeers(): Promise<PeerInfo[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.listPeersResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_peers" })); this.listPeersResolvers.set(reqId, { resolve, timer: setTimeout(() => {
// Timeout after 5s — return empty list rather than hang. if (this.listPeersResolvers.delete(reqId)) resolve([]);
setTimeout(() => { }, 5_000) });
const idx = this.listPeersResolvers.indexOf(resolve); this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId }));
if (idx !== -1) {
this.listPeersResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
}); });
} }
@@ -342,15 +347,11 @@ export class BrokerClient {
async getState(key: string): Promise<{ key: string; value: unknown; updatedBy: string; updatedAt: string } | null> { async getState(key: string): Promise<{ key: string; value: unknown; updatedBy: string; updatedAt: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.stateResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "get_state", key })); this.stateResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.stateResolvers.delete(reqId)) resolve(null);
const idx = this.stateResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId }));
this.stateResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
}); });
} }
@@ -358,15 +359,11 @@ export class BrokerClient {
async listState(): Promise<Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>> { async listState(): Promise<Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.stateListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_state" })); this.stateListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.stateListResolvers.delete(reqId)) resolve([]);
const idx = this.stateListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId }));
this.stateListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
}); });
} }
@@ -376,15 +373,11 @@ export class BrokerClient {
async remember(content: string, tags?: string[]): Promise<string | null> { async remember(content: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.memoryStoreResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "remember", content, tags })); this.memoryStoreResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
const idx = this.memoryStoreResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId }));
this.memoryStoreResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
}); });
} }
@@ -392,15 +385,11 @@ export class BrokerClient {
async recall(query: string): Promise<Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>> { async recall(query: string): Promise<Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.memoryRecallResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "recall", query })); this.memoryRecallResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
const idx = this.memoryRecallResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId }));
this.memoryRecallResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
}); });
} }
@@ -411,51 +400,46 @@ export class BrokerClient {
} }
/** Check delivery status of a sent message. */ /** Check delivery status of a sent message. */
private messageStatusResolvers: Array<(result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void> = []; private messageStatusResolvers = new Map<string, { resolve: (result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void; timer: NodeJS.Timeout }>();
private fileUrlResolvers: Array<(result: { url: string; name: string } | null) => void> = []; private fileUrlResolvers = new Map<string, { resolve: (result: { url: string; name: string; encrypted?: boolean; sealedKey?: string } | null) => void; timer: NodeJS.Timeout }>();
private fileListResolvers: Array<(files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void> = []; private fileListResolvers = new Map<string, { resolve: (files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void; timer: NodeJS.Timeout }>();
private fileStatusResolvers: Array<(accesses: Array<{ peerName: string; accessedAt: string }>) => void> = []; private fileStatusResolvers = new Map<string, { resolve: (accesses: Array<{ peerName: string; accessedAt: string }>) => void; timer: NodeJS.Timeout }>();
private vectorStoredResolvers: Array<(id: string | null) => void> = []; private vectorStoredResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private vectorResultsResolvers: Array<(results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void> = []; private vectorResultsResolvers = new Map<string, { resolve: (results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void; timer: NodeJS.Timeout }>();
private collectionListResolvers: Array<(collections: string[]) => void> = []; private collectionListResolvers = new Map<string, { resolve: (collections: string[]) => void; timer: NodeJS.Timeout }>();
private graphResultResolvers: Array<(rows: Array<Record<string, unknown>>) => void> = []; private graphResultResolvers = new Map<string, { resolve: (rows: Array<Record<string, unknown>>) => void; timer: NodeJS.Timeout }>();
private contextListResolvers: Array<(contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void> = []; private contextListResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private contextResultsResolvers: Array<(contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void> = []; private contextResultsResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private taskCreatedResolvers: Array<(id: string | null) => void> = []; private taskCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private taskListResolvers: Array<(tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void> = []; private taskListResolvers = new Map<string, { resolve: (tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void; timer: NodeJS.Timeout }>();
private meshQueryResolvers: Array<(result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void> = []; private meshQueryResolvers = new Map<string, { resolve: (result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void; timer: NodeJS.Timeout }>();
private meshSchemaResolvers: Array<(tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void> = []; private meshSchemaResolvers = new Map<string, { resolve: (tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void; timer: NodeJS.Timeout }>();
private streamCreatedResolvers: Array<(id: string | null) => void> = []; private streamCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private streamListResolvers: Array<(streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void> = []; private streamListResolvers = new Map<string, { resolve: (streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void; timer: NodeJS.Timeout }>();
private streamDataHandlers = new Set<(data: { stream: string; data: unknown; publishedBy: string }) => void>(); private streamDataHandlers = new Set<(data: { stream: string; data: unknown; publishedBy: string }) => void>();
async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> { async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.messageStatusResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "message_status", messageId })); this.messageStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.messageStatusResolvers.delete(reqId)) resolve(null);
const idx = this.messageStatusResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.messageStatusResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "message_status", messageId, _reqId: reqId }));
}, 5_000);
}); });
} }
// --- Files --- // --- Files ---
/** Get a download URL for a shared file. */ /** Get a download URL for a shared file. */
async getFile(fileId: string): Promise<{ url: string; name: string } | null> { async getFile(fileId: string): Promise<{ url: string; name: string; encrypted?: boolean; sealedKey?: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.fileUrlResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "get_file", fileId })); this.fileUrlResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.fileUrlResolvers.delete(reqId)) resolve(null);
const idx = this.fileUrlResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "get_file", fileId, _reqId: reqId }));
this.fileUrlResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
}); });
} }
@@ -463,15 +447,11 @@ export class BrokerClient {
async listFiles(query?: string, from?: string): Promise<Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>> { async listFiles(query?: string, from?: string): Promise<Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.fileListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_files", query, from })); this.fileListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.fileListResolvers.delete(reqId)) resolve([]);
const idx = this.fileListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "list_files", query, from, _reqId: reqId }));
this.fileListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
}); });
} }
@@ -479,15 +459,11 @@ export class BrokerClient {
async fileStatus(fileId: string): Promise<Array<{ peerName: string; accessedAt: string }>> { async fileStatus(fileId: string): Promise<Array<{ peerName: string; accessedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.fileStatusResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "file_status", fileId })); this.fileStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.fileStatusResolvers.delete(reqId)) resolve([]);
const idx = this.fileStatusResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.ws!.send(JSON.stringify({ type: "file_status", fileId, _reqId: reqId }));
this.fileStatusResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
}); });
} }
@@ -497,10 +473,11 @@ export class BrokerClient {
this.ws.send(JSON.stringify({ type: "delete_file", fileId })); this.ws.send(JSON.stringify({ type: "delete_file", fileId }));
} }
/** Upload a file to the broker via HTTP POST. Returns file ID or null. */ /** Upload a file to the broker via HTTP POST. Returns file ID. */
async uploadFile(filePath: string, meshId: string, memberId: string, opts: { async uploadFile(filePath: string, meshId: string, memberId: string, opts: {
name?: string; tags?: string[]; persistent?: boolean; targetSpec?: string; name?: string; tags?: string[]; persistent?: boolean; targetSpec?: string;
}): Promise<string | null> { encrypted?: boolean; ownerPubkey?: string; fileKeys?: Array<{ peerPubkey: string; sealedKey: string }>;
}): Promise<string> {
const { readFileSync } = await import("node:fs"); const { readFileSync } = await import("node:fs");
const { basename } = await import("node:path"); const { basename } = await import("node:path");
const data = readFileSync(filePath); const data = readFileSync(filePath);
@@ -522,6 +499,9 @@ export class BrokerClient {
"X-Tags": JSON.stringify(opts.tags ?? []), "X-Tags": JSON.stringify(opts.tags ?? []),
"X-Persistent": String(opts.persistent ?? true), "X-Persistent": String(opts.persistent ?? true),
"X-Target-Spec": opts.targetSpec ?? "", "X-Target-Spec": opts.targetSpec ?? "",
...(opts.encrypted ? { "X-Encrypted": "true" } : {}),
...(opts.ownerPubkey ? { "X-Owner-Pubkey": opts.ownerPubkey } : {}),
...(opts.fileKeys?.length ? { "X-File-Keys": JSON.stringify(opts.fileKeys) } : {}),
}, },
body: data, body: data,
signal: AbortSignal.timeout(30_000), signal: AbortSignal.timeout(30_000),
@@ -533,18 +513,29 @@ export class BrokerClient {
return body.fileId; return body.fileId;
} }
/** Grant a peer access to an encrypted file (owner only). */
async grantFileAccess(fileId: string, peerPubkey: string, sealedKey: string): Promise<boolean> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false;
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.grantFileAccessResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.grantFileAccessResolvers.delete(reqId)) resolve(false);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey, _reqId: reqId }));
});
}
// --- Vectors --- // --- Vectors ---
/** Store an embedding in a per-mesh Qdrant collection. */ /** Store an embedding in a per-mesh Qdrant collection. */
async vectorStore(collection: string, text: string, metadata?: Record<string, unknown>): Promise<string | null> { async vectorStore(collection: string, text: string, metadata?: Record<string, unknown>): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.vectorStoredResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata })); this.vectorStoredResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.vectorStoredResolvers.delete(reqId)) resolve(null);
const idx = this.vectorStoredResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.vectorStoredResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -552,12 +543,11 @@ export class BrokerClient {
async vectorSearch(collection: string, query: string, limit?: number): Promise<Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>> { async vectorSearch(collection: string, query: string, limit?: number): Promise<Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.vectorResultsResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit })); this.vectorResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.vectorResultsResolvers.delete(reqId)) resolve([]);
const idx = this.vectorResultsResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.vectorResultsResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -571,12 +561,11 @@ export class BrokerClient {
async listCollections(): Promise<string[]> { async listCollections(): Promise<string[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.collectionListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_collections" })); this.collectionListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.collectionListResolvers.delete(reqId)) resolve([]);
const idx = this.collectionListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.collectionListResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "list_collections", _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -586,12 +575,11 @@ export class BrokerClient {
async graphQuery(cypher: string): Promise<Array<Record<string, unknown>>> { async graphQuery(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.graphResultResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "graph_query", cypher })); this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.graphResultResolvers.delete(reqId)) resolve([]);
const idx = this.graphResultResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "graph_query", cypher, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -599,12 +587,11 @@ export class BrokerClient {
async graphExecute(cypher: string): Promise<Array<Record<string, unknown>>> { async graphExecute(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.graphResultResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "graph_execute", cypher })); this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.graphResultResolvers.delete(reqId)) resolve([]);
const idx = this.graphResultResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "graph_execute", cypher, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -620,12 +607,11 @@ export class BrokerClient {
async getContext(query: string): Promise<Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>> { async getContext(query: string): Promise<Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.contextResultsResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "get_context", query })); this.contextResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.contextResultsResolvers.delete(reqId)) resolve([]);
const idx = this.contextResultsResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.contextResultsResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "get_context", query, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -633,12 +619,11 @@ export class BrokerClient {
async listContexts(): Promise<Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>> { async listContexts(): Promise<Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.contextListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_contexts" })); this.contextListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.contextListResolvers.delete(reqId)) resolve([]);
const idx = this.contextListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.contextListResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "list_contexts", _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -648,37 +633,35 @@ export class BrokerClient {
async createTask(title: string, assignee?: string, priority?: string, tags?: string[]): Promise<string | null> { async createTask(title: string, assignee?: string, priority?: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.taskCreatedResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags })); this.taskCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.taskCreatedResolvers.delete(reqId)) resolve(null);
const idx = this.taskCreatedResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.taskCreatedResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags, _reqId: reqId }));
}, 5_000);
}); });
} }
/** Claim an unclaimed task. */ /** Claim an unclaimed task. */
async claimTask(id: string): Promise<void> { async claimTask(id: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "claim_task", id })); this.ws.send(JSON.stringify({ type: "claim_task", taskId: id }));
} }
/** Mark a task done with optional result. */ /** Mark a task done with optional result. */
async completeTask(id: string, result?: string): Promise<void> { async completeTask(id: string, result?: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "complete_task", id, result })); this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result }));
} }
/** List tasks filtered by status/assignee. */ /** List tasks filtered by status/assignee. */
async listTasks(status?: string, assignee?: string): Promise<Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>> { async listTasks(status?: string, assignee?: string): Promise<Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.taskListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee })); this.taskListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.taskListResolvers.delete(reqId)) resolve([]);
const idx = this.taskListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.taskListResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -688,12 +671,11 @@ export class BrokerClient {
async meshQuery(sql: string): Promise<{ columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null> { async meshQuery(sql: string): Promise<{ columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.meshQueryResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "mesh_query", sql })); this.meshQueryResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.meshQueryResolvers.delete(reqId)) resolve(null);
const idx = this.meshQueryResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.meshQueryResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "mesh_query", sql, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -707,12 +689,11 @@ export class BrokerClient {
async meshSchema(): Promise<Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>> { async meshSchema(): Promise<Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.meshSchemaResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "mesh_schema" })); this.meshSchemaResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.meshSchemaResolvers.delete(reqId)) resolve([]);
const idx = this.meshSchemaResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.meshSchemaResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "mesh_schema", _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -722,12 +703,11 @@ export class BrokerClient {
async createStream(name: string): Promise<string | null> { async createStream(name: string): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.streamCreatedResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "create_stream", name })); this.streamCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.streamCreatedResolvers.delete(reqId)) resolve(null);
const idx = this.streamCreatedResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.streamCreatedResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "create_stream", name, _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -753,12 +733,11 @@ export class BrokerClient {
async listStreams(): Promise<Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>> { async listStreams(): Promise<Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => { return new Promise((resolve) => {
this.streamListResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "list_streams" })); this.streamListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.streamListResolvers.delete(reqId)) resolve([]);
const idx = this.streamListResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.streamListResolvers.splice(idx, 1); resolve([]); } this.ws!.send(JSON.stringify({ type: "list_streams", _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -775,17 +754,16 @@ export class BrokerClient {
} }
// --- Mesh info --- // --- Mesh info ---
private meshInfoResolvers: Array<(result: Record<string, unknown> | null) => void> = []; private meshInfoResolvers = new Map<string, { resolve: (result: Record<string, unknown> | null) => void; timer: NodeJS.Timeout }>();
async meshInfo(): Promise<Record<string, unknown> | null> { async meshInfo(): Promise<Record<string, unknown> | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => { return new Promise((resolve) => {
this.meshInfoResolvers.push(resolve); const reqId = this.makeReqId();
this.ws!.send(JSON.stringify({ type: "mesh_info" })); this.meshInfoResolvers.set(reqId, { resolve, timer: setTimeout(() => {
setTimeout(() => { if (this.meshInfoResolvers.delete(reqId)) resolve(null);
const idx = this.meshInfoResolvers.indexOf(resolve); }, 5_000) });
if (idx !== -1) { this.meshInfoResolvers.splice(idx, 1); resolve(null); } this.ws!.send(JSON.stringify({ type: "mesh_info", _reqId: reqId }));
}, 5_000);
}); });
} }
@@ -805,7 +783,33 @@ export class BrokerClient {
// --- Internals --- // --- Internals ---
private resolveFromMap<T>(
map: Map<string, { resolve: (v: T) => void; timer: NodeJS.Timeout }>,
reqId: string | undefined,
value: T,
): boolean {
let entry = reqId ? map.get(reqId) : undefined;
if (!entry) {
// Fallback: oldest pending (FIFO, for brokers that don't echo _reqId)
const first = map.entries().next().value as [string, { resolve: (v: T) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
entry = first[1];
map.delete(first[0]);
}
} else {
map.delete(reqId!);
}
if (entry) {
clearTimeout(entry.timer);
entry.resolve(value);
return true;
}
return false;
}
private handleServerMessage(msg: Record<string, unknown>): void { private handleServerMessage(msg: Record<string, unknown>): void {
const msgReqId = msg._reqId as string | undefined;
if (msg.type === "ack") { if (msg.type === "ack") {
const pending = this.pendingSends.get(String(msg.id ?? "")); const pending = this.pendingSends.get(String(msg.id ?? ""));
if (pending) { if (pending) {
@@ -819,8 +823,7 @@ export class BrokerClient {
} }
if (msg.type === "peers_list") { if (msg.type === "peers_list") {
const peers = (msg.peers as PeerInfo[]) ?? []; const peers = (msg.peers as PeerInfo[]) ?? [];
const resolver = this.listPeersResolvers.shift(); this.resolveFromMap(this.listPeersResolvers, msgReqId, peers);
if (resolver) resolver(peers);
return; return;
} }
if (msg.type === "push") { if (msg.type === "push") {
@@ -893,25 +896,26 @@ export class BrokerClient {
return; return;
} }
if (msg.type === "state_result") { if (msg.type === "state_result") {
const resolver = this.stateResolvers.shift(); // DEPENDENCY: The broker must NOT send state_result for set_state
if (resolver) { // operations (only for get_state). If the broker sends state_result for
if (msg.key) { // both, it would be consumed here by the next pending get_state resolver,
resolver({ // returning the wrong value (cross-contamination). The broker's set_state
key: String(msg.key), // handler was fixed to omit state_result; only get_state sends it.
value: msg.value, if (msg.key) {
updatedBy: String(msg.updatedBy ?? ""), this.resolveFromMap(this.stateResolvers, msgReqId, {
updatedAt: String(msg.updatedAt ?? ""), key: String(msg.key),
}); value: msg.value,
} else { updatedBy: String(msg.updatedBy ?? ""),
resolver(null); updatedAt: String(msg.updatedAt ?? ""),
} });
} else {
this.resolveFromMap(this.stateResolvers, msgReqId, null);
} }
return; return;
} }
if (msg.type === "state_list") { if (msg.type === "state_list") {
const entries = (msg.entries as Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) ?? []; const entries = (msg.entries as Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) ?? [];
const resolver = this.stateListResolvers.shift(); this.resolveFromMap(this.stateListResolvers, msgReqId, entries);
if (resolver) resolver(entries);
return; return;
} }
if (msg.type === "state_change") { if (msg.type === "state_change") {
@@ -926,120 +930,108 @@ export class BrokerClient {
return; return;
} }
if (msg.type === "memory_stored") { if (msg.type === "memory_stored") {
const resolver = this.memoryStoreResolvers.shift(); this.resolveFromMap(this.memoryStoreResolvers, msgReqId, msg.id ? String(msg.id) : null);
if (resolver) resolver(msg.id ? String(msg.id) : null);
return; return;
} }
if (msg.type === "memory_results") { if (msg.type === "memory_results") {
const memories = (msg.memories as Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) ?? []; const memories = (msg.memories as Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) ?? [];
const resolver = this.memoryRecallResolvers.shift(); this.resolveFromMap(this.memoryRecallResolvers, msgReqId, memories);
if (resolver) resolver(memories);
return; return;
} }
if (msg.type === "message_status_result") { if (msg.type === "message_status_result") {
const resolver = this.messageStatusResolvers.shift(); this.resolveFromMap(this.messageStatusResolvers, msgReqId, msg as any);
if (resolver) resolver(msg as any);
return; return;
} }
if (msg.type === "file_url") { if (msg.type === "file_url") {
const resolver = this.fileUrlResolvers.shift(); if (msg.url) {
if (resolver) { this.resolveFromMap(this.fileUrlResolvers, msgReqId, {
if (msg.url) { url: String(msg.url),
resolver({ url: String(msg.url), name: String(msg.name ?? "") }); name: String(msg.name ?? ""),
} else { encrypted: msg.encrypted ? true : undefined,
resolver(null); sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined,
} });
} else {
this.resolveFromMap(this.fileUrlResolvers, msgReqId, null);
} }
return; return;
} }
if (msg.type === "file_list") { if (msg.type === "file_list") {
const files = (msg.files as Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) ?? []; const files = (msg.files as Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) ?? [];
const resolver = this.fileListResolvers.shift(); this.resolveFromMap(this.fileListResolvers, msgReqId, files);
if (resolver) resolver(files);
return; return;
} }
if (msg.type === "file_status_result") { if (msg.type === "file_status_result") {
const accesses = (msg.accesses as Array<{ peerName: string; accessedAt: string }>) ?? []; const accesses = (msg.accesses as Array<{ peerName: string; accessedAt: string }>) ?? [];
const resolver = this.fileStatusResolvers.shift(); this.resolveFromMap(this.fileStatusResolvers, msgReqId, accesses);
if (resolver) resolver(accesses); return;
}
if (msg.type === "grant_file_access_ok") {
this.resolveFromMap(this.grantFileAccessResolvers, msgReqId, true);
return; return;
} }
if (msg.type === "vector_stored") { if (msg.type === "vector_stored") {
const resolver = this.vectorStoredResolvers.shift(); this.resolveFromMap(this.vectorStoredResolvers, msgReqId, msg.id ? String(msg.id) : null);
if (resolver) resolver(msg.id ? String(msg.id) : null);
return; return;
} }
if (msg.type === "vector_results") { if (msg.type === "vector_results") {
const results = (msg.results as Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) ?? []; const results = (msg.results as Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) ?? [];
const resolver = this.vectorResultsResolvers.shift(); this.resolveFromMap(this.vectorResultsResolvers, msgReqId, results);
if (resolver) resolver(results);
return; return;
} }
if (msg.type === "collection_list") { if (msg.type === "collection_list") {
const collections = (msg.collections as string[]) ?? []; const collections = (msg.collections as string[]) ?? [];
const resolver = this.collectionListResolvers.shift(); this.resolveFromMap(this.collectionListResolvers, msgReqId, collections);
if (resolver) resolver(collections);
return; return;
} }
if (msg.type === "graph_result") { if (msg.type === "graph_result") {
const rows = (msg.rows as Array<Record<string, unknown>>) ?? []; // Broker sends { type: "graph_result", records: [...] }
const resolver = this.graphResultResolvers.shift(); const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
if (resolver) resolver(rows); this.resolveFromMap(this.graphResultResolvers, msgReqId, rows);
return; return;
} }
if (msg.type === "context_list") { if (msg.type === "context_list") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) ?? []; const contexts = (msg.contexts as Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextListResolvers.shift(); this.resolveFromMap(this.contextListResolvers, msgReqId, contexts);
if (resolver) resolver(contexts);
return; return;
} }
if (msg.type === "context_results") { if (msg.type === "context_results") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) ?? []; const contexts = (msg.contexts as Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextResultsResolvers.shift(); this.resolveFromMap(this.contextResultsResolvers, msgReqId, contexts);
if (resolver) resolver(contexts);
return; return;
} }
if (msg.type === "task_created") { if (msg.type === "task_created") {
const resolver = this.taskCreatedResolvers.shift(); this.resolveFromMap(this.taskCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
if (resolver) resolver(msg.id ? String(msg.id) : null);
return; return;
} }
if (msg.type === "task_list") { if (msg.type === "task_list") {
const tasks = (msg.tasks as Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) ?? []; const tasks = (msg.tasks as Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) ?? [];
const resolver = this.taskListResolvers.shift(); this.resolveFromMap(this.taskListResolvers, msgReqId, tasks);
if (resolver) resolver(tasks);
return; return;
} }
if (msg.type === "mesh_query_result") { if (msg.type === "mesh_query_result") {
const resolver = this.meshQueryResolvers.shift(); if (msg.columns) {
if (resolver) { this.resolveFromMap(this.meshQueryResolvers, msgReqId, {
if (msg.columns) { columns: (msg.columns as string[]) ?? [],
resolver({ rows: (msg.rows as Array<Record<string, unknown>>) ?? [],
columns: (msg.columns as string[]) ?? [], rowCount: (msg.rowCount as number) ?? 0,
rows: (msg.rows as Array<Record<string, unknown>>) ?? [], });
rowCount: (msg.rowCount as number) ?? 0, } else {
}); this.resolveFromMap(this.meshQueryResolvers, msgReqId, null);
} else {
resolver(null);
}
} }
return; return;
} }
if (msg.type === "mesh_schema_result") { if (msg.type === "mesh_schema_result") {
const tables = (msg.tables as Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) ?? []; const tables = (msg.tables as Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) ?? [];
const resolver = this.meshSchemaResolvers.shift(); this.resolveFromMap(this.meshSchemaResolvers, msgReqId, tables);
if (resolver) resolver(tables);
return; return;
} }
if (msg.type === "stream_created") { if (msg.type === "stream_created") {
const resolver = this.streamCreatedResolvers.shift(); this.resolveFromMap(this.streamCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
if (resolver) resolver(msg.id ? String(msg.id) : null);
return; return;
} }
if (msg.type === "stream_list") { if (msg.type === "stream_list") {
const streams = (msg.streams as Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) ?? []; const streams = (msg.streams as Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) ?? [];
const resolver = this.streamListResolvers.shift(); this.resolveFromMap(this.streamListResolvers, msgReqId, streams);
if (resolver) resolver(streams);
return; return;
} }
if (msg.type === "stream_data") { if (msg.type === "stream_data") {
@@ -1054,13 +1046,13 @@ export class BrokerClient {
return; return;
} }
if (msg.type === "mesh_info_result") { if (msg.type === "mesh_info_result") {
const resolver = this.meshInfoResolvers.shift(); this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record<string, unknown>);
if (resolver) resolver(msg as Record<string, unknown>);
return; return;
} }
if (msg.type === "error") { if (msg.type === "error") {
this.debug(`broker error: ${msg.code} ${msg.message}`); this.debug(`broker error: ${msg.code} ${msg.message}`);
const id = msg.id ? String(msg.id) : null; const id = msg.id ? String(msg.id) : null;
let handledByPendingSend = false;
if (id) { if (id) {
const pending = this.pendingSends.get(id); const pending = this.pendingSends.get(id);
if (pending) { if (pending) {
@@ -1069,6 +1061,46 @@ export class BrokerClient {
error: `${msg.code}: ${msg.message}`, error: `${msg.code}: ${msg.message}`,
}); });
this.pendingSends.delete(id); this.pendingSends.delete(id);
handledByPendingSend = true;
}
}
if (!handledByPendingSend) {
// Best-effort: unblock the first waiting resolver so callers don't
// hang for 5s. We don't know which tool triggered the error, so we
// pop the first non-empty resolver map in priority order.
const allMaps: Array<[Map<string, { resolve: (v: any) => void; timer: NodeJS.Timeout }>, unknown]> = [
[this.stateResolvers, null],
[this.stateListResolvers, []],
[this.memoryStoreResolvers, null],
[this.memoryRecallResolvers, []],
[this.fileUrlResolvers, null],
[this.fileListResolvers, []],
[this.fileStatusResolvers, []],
[this.graphResultResolvers, []],
[this.vectorStoredResolvers, null],
[this.vectorResultsResolvers, []],
[this.taskListResolvers, []],
[this.meshQueryResolvers, null],
[this.contextResultsResolvers, []],
[this.contextListResolvers, []],
[this.streamListResolvers, []],
[this.messageStatusResolvers, null],
[this.grantFileAccessResolvers, false],
[this.collectionListResolvers, []],
[this.meshSchemaResolvers, []],
[this.taskCreatedResolvers, null],
[this.streamCreatedResolvers, null],
[this.listPeersResolvers, []],
[this.meshInfoResolvers, null],
];
for (const [map, defaultVal] of allMaps) {
const first = (map as Map<string, any>).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
(map as Map<string, any>).delete(first[0]);
clearTimeout(first[1].timer);
first[1].resolve(defaultVal);
break; // only pop one
}
} }
} }
} }

View File

@@ -12,6 +12,7 @@ import { env } from "../env";
const clients = new Map<string, BrokerClient>(); const clients = new Map<string, BrokerClient>();
let configDisplayName: string | undefined; let configDisplayName: string | undefined;
let configGroups: Config["groups"] = [];
/** Ensure a BrokerClient exists + is connecting/open for this mesh. */ /** Ensure a BrokerClient exists + is connecting/open for this mesh. */
export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> { export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
@@ -21,6 +22,10 @@ export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
clients.set(mesh.meshId, client); clients.set(mesh.meshId, client);
try { try {
await client.connect(); await client.connect();
// Auto-join groups declared at launch time (--groups flag or config).
for (const g of configGroups ?? []) {
try { await client.joinGroup(g.name, g.role); } catch { /* best effort */ }
}
} catch { } catch {
// Connect failed → client is in "reconnecting" state, leave it // Connect failed → client is in "reconnecting" state, leave it
// wired so tool calls can surface the status. // wired so tool calls can surface the status.
@@ -31,6 +36,7 @@ export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
/** Start clients for every joined mesh. Called once on MCP server start. */ /** Start clients for every joined mesh. Called once on MCP server start. */
export async function startClients(config: Config): Promise<void> { export async function startClients(config: Config): Promise<void> {
configDisplayName = config.displayName; configDisplayName = config.displayName;
configGroups = config.groups ?? [];
await Promise.allSettled(config.meshes.map(ensureClient)); await Promise.allSettled(config.meshes.map(ensureClient));
} }

9036
bun.lock Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
ALTER TABLE "mesh"."file" ADD COLUMN "encrypted" boolean DEFAULT false NOT NULL;--> statement-breakpoint
ALTER TABLE "mesh"."file" ADD COLUMN "owner_pubkey" text;--> statement-breakpoint
CREATE TABLE "mesh"."file_key" (
"id" text PRIMARY KEY NOT NULL,
"file_id" text NOT NULL,
"peer_pubkey" text NOT NULL,
"sealed_key" text NOT NULL,
"granted_at" timestamp DEFAULT now() NOT NULL,
"granted_by_pubkey" text
);
--> statement-breakpoint
ALTER TABLE "mesh"."file_key" ADD CONSTRAINT "file_key_file_id_fkey" FOREIGN KEY ("file_id") REFERENCES "mesh"."file"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "file_key_file_peer_idx" ON "mesh"."file_key" ("file_id","peer_pubkey");

View File

@@ -0,0 +1,3 @@
ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint
ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint
CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id");

View File

@@ -305,6 +305,8 @@ export const meshFile = meshSchema.table("file", {
minioKey: text().notNull(), minioKey: text().notNull(),
tags: text().array().default([]), tags: text().array().default([]),
persistent: boolean().notNull().default(true), persistent: boolean().notNull().default(true),
encrypted: boolean().notNull().default(false),
ownerPubkey: text(),
uploadedByName: text(), uploadedByName: text(),
uploadedByMember: text().references(() => meshMember.id), uploadedByMember: text().references(() => meshMember.id),
targetSpec: text(), // null = entire mesh targetSpec: text(), // null = entire mesh
@@ -328,24 +330,60 @@ export const meshFileAccess = meshSchema.table("file_access", {
}); });
/** /**
* Per-peer context snapshot. Each peer (presence) has at most one context * Per-peer encrypted symmetric keys for E2E encrypted files.
* The file body is encrypted with a random key (Kf); Kf is sealed
* (crypto_box_seal) to each authorized peer's X25519 pubkey and stored here.
*/
export const meshFileKey = meshSchema.table("file_key", {
id: text().primaryKey().notNull().$defaultFn(generateId),
fileId: text()
.references(() => meshFile.id, { onDelete: "cascade" })
.notNull(),
peerPubkey: text().notNull(),
sealedKey: text().notNull(),
grantedAt: timestamp().defaultNow().notNull(),
grantedByPubkey: text(),
});
export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({
file: one(meshFile, {
fields: [meshFileKey.fileId],
references: [meshFile.id],
}),
}));
/**
* Per-peer context snapshot. Each peer (member) has at most one context
* entry per mesh, upserted on each share_context call. Allows peers to * entry per mesh, upserted on each share_context call. Allows peers to
* discover what others are working on, which files they've read, and * discover what others are working on, which files they've read, and
* key findings — without sending a direct message. * key findings — without sending a direct message.
*
* `memberId` is the stable upsert key (survives reconnects). `presenceId`
* is kept for backwards-compat but is nullable — new rows should always
* populate `memberId`. The unique index on (meshId, memberId) prevents
* stale rows from accumulating when a session reconnects with a new
* ephemeral presenceId.
*/ */
export const meshContext = meshSchema.table("context", { export const meshContext = meshSchema.table(
id: text().primaryKey().notNull().$defaultFn(generateId), "context",
meshId: text() {
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) id: text().primaryKey().notNull().$defaultFn(generateId),
.notNull(), meshId: text()
presenceId: text().references(() => presence.id, { onDelete: "cascade" }), .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
peerName: text(), .notNull(),
summary: text().notNull(), memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }),
filesRead: text().array().default([]), presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
keyFindings: text().array().default([]), peerName: text(),
tags: text().array().default([]), summary: text().notNull(),
updatedAt: timestamp().defaultNow().notNull(), filesRead: text().array().default([]),
}); keyFindings: text().array().default([]),
tags: text().array().default([]),
updatedAt: timestamp().defaultNow().notNull(),
},
(table) => [
uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId),
],
);
/** /**
* Mesh-scoped task board. Peers can create tasks, claim them, and mark * Mesh-scoped task board. Peers can create tasks, claim them, and mark
@@ -531,6 +569,10 @@ export type SelectMeshFile = typeof meshFile.$inferSelect;
export type InsertMeshFile = typeof meshFile.$inferInsert; export type InsertMeshFile = typeof meshFile.$inferInsert;
export type SelectMeshFileAccess = typeof meshFileAccess.$inferSelect; export type SelectMeshFileAccess = typeof meshFileAccess.$inferSelect;
export type InsertMeshFileAccess = typeof meshFileAccess.$inferInsert; export type InsertMeshFileAccess = typeof meshFileAccess.$inferInsert;
export const selectMeshFileKeySchema = createSelectSchema(meshFileKey);
export const insertMeshFileKeySchema = createInsertSchema(meshFileKey);
export type SelectMeshFileKey = typeof meshFileKey.$inferSelect;
export type InsertMeshFileKey = typeof meshFileKey.$inferInsert;
export const selectMeshContextSchema = createSelectSchema(meshContext); export const selectMeshContextSchema = createSelectSchema(meshContext);
export const insertMeshContextSchema = createInsertSchema(meshContext); export const insertMeshContextSchema = createInsertSchema(meshContext);
export const selectMeshTaskSchema = createSelectSchema(meshTask); export const selectMeshTaskSchema = createSelectSchema(meshTask);