28 Commits

Author SHA1 Message Date
Alejandro Gutiérrez
59848f0d3e chore(cli): v0.6.6 — correlation ID refactor (resolver Maps + _reqId)
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
CI / Lint (push) Has been cancelled
2026-04-07 14:31:29 +01:00
Alejandro Gutiérrez
d0fa1c028f fix(broker): echo _reqId in all WS responses for correlation ID routing
Extract _reqId from incoming WS messages and include it in every direct
response sendToPeer call and sendError call. Clients can now match
responses to requests by ID instead of relying on FIFO ordering.
Old clients without _reqId are unaffected (field simply omitted).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:28:30 +01:00
Alejandro Gutiérrez
8f925d9a9e fix(cli): correlation ID refactor — resolver Maps with _reqId + FIFO fallback
Replace all 22 resolver Array<fn> patterns with Map<reqId, {resolve, timer}>.
Outgoing messages now include _reqId; on response the broker's echoed _reqId
is used for exact matching, with FIFO fallback for brokers that don't echo it.
Add makeReqId() helper and resolveFromMap() utility. Error propagation block
updated to iterate Maps and pop the oldest entry across all queues.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 14:25:51 +01:00
Alejandro Gutiérrez
4ce1034dcd chore(cli): v0.6.5 — all bug fixes batch
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
2026-04-07 13:12:29 +01:00
Alejandro Gutiérrez
e26a36e543 fix(broker): vector_stored type, set_state no-resp, subscribe ack
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- vector_store sends {type:"vector_stored",id}; wrapped in try/catch
- set_state no longer sends state_result (fire-and-forget)
- subscribe sends {type:"subscribed",stream} confirmation
- remove broken myPresence lookup in mesh_info
- add WSVectorStoredMessage + WSSubscribedMessage to types union

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:08:06 +01:00
Alejandro Gutiérrez
60c74d9463 fix(broker): shareContext stable upsert key + createStream atomic upsert
- shareContext: adds optional memberId param; when provided, upserts on
  (meshId, memberId) instead of (meshId, presenceId) — prevents stale
  context rows accumulating on every reconnect. Falls back to presenceId
  for legacy/anonymous connections. Also refreshes presenceId on update
  so it stays current.
- schema: adds member_id column + unique index context_mesh_member_idx
  on mesh.context table; new migration 0013_context-stable-member-key.sql.
- index.ts call site updated to pass conn.memberId as the stable key.
- createStream: replaces SELECT-then-INSERT TOCTOU race with atomic
  INSERT ... ON CONFLICT DO NOTHING RETURNING, followed by SELECT on miss.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:58 +01:00
Alejandro Gutiérrez
6fba9bd4eb feat(cli): fix field mismatches + error propagation
- claim_task/complete_task: send taskId not id
- graph_result: read msg.records not msg.rows
- message_status: try all mesh clients, not only first
- broker: omit state_result for set_state (fixes get_state cross-contamination)
- error handler: unblock first pending resolver on unmatched broker errors

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 13:07:25 +01:00
Alejandro Gutiérrez
5bcc1fe323 chore(cli): bump to v0.6.4 — fix get_file sealedKey bug
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
2026-04-07 12:57:20 +01:00
Alejandro Gutiérrez
e70f0ed1ff fix(broker/cli): e2e get_file owner sealedKey bug
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
broker: owner also fetches sealedKey from mesh.file_key (not skipped),
  only non-owners are blocked when key is missing
cli: explicit error when encrypted file has no sealedKey (no silent raw download)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:56:36 +01:00
Alejandro Gutiérrez
5f696f47ea feat(cli): v0.6.3 — e2e file crypto module + encrypted share_file
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- add crypto/file-crypto.ts: encryptFile, decryptFile, sealKeyForPeer, openSealedKey
- update share_file: when to= set, encrypts file + seals key per recipient
- update get_file: decrypts if encrypted + sealedKey present
- add grant_file_access tool: re-seals Kf for a new peer without re-upload
- add getSessionPubkey/getSessionSecretKey getters on BrokerClient
- add grantFileAccess WS method on BrokerClient
- bump version to 0.6.3

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:53:13 +01:00
Alejandro Gutiérrez
ccb9fb2a68 feat(broker/db): e2e file encryption schema + db functions
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- add mesh.file_key table (fileId, peerPubkey, sealedKey, grantedByPubkey)
- add encrypted + ownerPubkey columns to mesh.file
- export insertFileKeys, getFileKey, grantFileKey from broker.ts
- update uploadFile/getFile/listFiles to include encrypted/ownerPubkey
- migration 0012_add-file-encryption applied to prod

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:43:57 +01:00
Alejandro Gutiérrez
898c061089 feat(cli): e2e file encryption — file-crypto.ts + client + MCP tools
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:33:39 +01:00
Alejandro Gutiérrez
f7a6559429 feat(broker): add E2E file encryption to HTTP upload and WS handlers
- parse x-encrypted/x-owner-pubkey/x-file-keys headers in handleUploadPost
- pass encrypted and ownerPubkey to uploadFile, call insertFileKeys after
- get_file: fetch sealedKey for non-owners, block if missing, include in response
- list_files: include encrypted field per file
- add grant_file_access WS handler so owners can seal keys for peers
- update types.ts with new message interfaces and union members

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:32:46 +01:00
Alejandro Gutiérrez
579d0c3d3e chore: bump version to 0.6.0
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:21:03 +01:00
Alejandro Gutiérrez
190f5a958e refactor(cli): migrate to citty — --help generated from flag definitions
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Replace manual switch + HELP string with citty defineCommand/runMain.
Flag definitions in index.ts are now the single source of truth for
--help output. Remove parseArgs() from launch.ts; accept citty-parsed
flags + rawArgs (-- passthrough to claude preserved).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:19:16 +01:00
Alejandro Gutiérrez
03661e1b68 docs(cli): expand --help with all launch flags, groups hierarchy, env vars
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:16:04 +01:00
Alejandro Gutiérrez
d451fc296e feat: hierarchical group routing + role wiring
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
broker: expand member groups to ancestor paths at drain time (pull model)
- @flexicar message reaches peers in @flexicar/core, @flexicar/output, etc.
- Resolved at drainForMember — no DB changes, fully backward-compatible
- Any depth: flexicar/team/backend also matches @flexicar and @flexicar/team

cli: wire --role all the way through to session config + env
- Config.role field added
- launch.ts stores role in sessionConfig, passes CLAUDEMESH_ROLE env var
- mcp/server.ts includes role in identity string
- manager.ts auto-joins groups from config on WS connect (--groups flag now works)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 12:09:37 +01:00
Alejandro Gutiérrez
3da5d71275 fix(broker): fix share_file DB insert failures
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
- Normalise tags to Array before Drizzle insert (PgArray mapper calls
  .map() and throws if value is not a standard JS Array)
- Use uploadedByName instead of uploadedByMember FK — the X-Member-Id
  header carries the mesh slug, not a mesh.member primary key

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 08:56:43 +01:00
Alejandro Gutiérrez
cdf335f609 fix(broker): fix MINIO_USE_SSL env coercion
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
z.coerce.boolean() treats any non-empty string as true, so MINIO_USE_SSL="false" → true.
Switch to explicit enum+transform.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-07 08:38:06 +01:00
Alejandro Gutiérrez
0cd16ff358 fix: exclude sender only for broadcasts, not direct messages
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
The sender exclusion filter (excludeSenderSessionPubkey) was blocking
delivery of ALL messages from the sender, including direct messages
to other peers. Now only excludes on broadcast (target_spec = '*').

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:34:09 +01:00
Alejandro Gutiérrez
3e9707276d fix: add diagnostic logging to maybePushQueuedMessages
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:21:29 +01:00
Alejandro Gutiérrez
82cfee315c fix: v0.5.9 — mesh_info returns correct display name
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:10:30 +01:00
Alejandro Gutiérrez
ab08be04a5 feat(cli): v0.5.8 — welcome notification on connect
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 18:07:08 +01:00
Alejandro Gutiérrez
ee585a8370 fix(cli): v0.5.7 — event loop keepalive for stdout flush
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Node.js stdout to a pipe is buffered. Without periodic event loop
activity, WS callback → server.notification() → stdout.write() may
not flush until the next I/O event. A 1s setInterval (NOT unref'd)
keeps the event loop ticking so notifications flush immediately.

This is why claude-intercom worked: its 1s HTTP poll kept the event
loop active as a side effect. Claudemesh's passive WS listener let
the event loop settle, causing stdout to buffer indefinitely.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 17:48:41 +01:00
Alejandro Gutiérrez
1f078bf0c8 fix(web): --no-turbopack for prod build (payload CSS)
Some checks failed
CI / Typecheck (push) Has been cancelled
CI / Lint (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 17:32:24 +01:00
Alejandro Gutiérrez
2372032a68 fix(cli): v0.5.6 — fix ping_mesh self-send + add diagnostics
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 17:32:03 +01:00
Alejandro Gutiérrez
a70c5fd124 feat(cli): v0.5.5 — ping_mesh diagnostic tool
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Sends test messages to self through the full pipeline per priority
and measures round-trip timing. Reports send→ack and send→receive
latency. Detects broker priority gating (status=working holds next/low).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 17:27:00 +01:00
Alejandro Gutiérrez
5c62d287cf fix(cli): v0.5.4 — revert to event-driven push, add Claude Code integration spec
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
Release / Publish multi-arch images (push) Has been cancelled
Revert poll-based drain (v0.5.2 overcorrection). Claude Code source
confirms notifications are processed event-driven via React
useEffect, not polled. The WS onPush → server.notification() path
is correct.

Added section 13 to SPEC.md documenting the full Claude Code
notification pipeline, feature gates, priority gating, and common
push delivery issues.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 17:04:05 +01:00
19 changed files with 10411 additions and 586 deletions

58
SPEC.md
View File

@@ -855,7 +855,63 @@ The broker:
---
## 13. Encryption
## 13. Claude Code Integration — How Push Delivery Works
Understanding how Claude Code processes channel notifications is critical for claudemesh reliability.
### The notification pipeline
```
MCP server (claudemesh-cli)
└─ server.notification("notifications/claude/channel", { content, meta })
└─ writes JSON-RPC to stdout
└─ Claude Code reads from MCP process stdout
└─ setNotificationHandler fires
└─ enqueue({ mode: "prompt", value: wrappedContent, origin: { kind: "channel" } })
└─ React useSyncExternalStore triggers re-render
└─ useQueueProcessor effect fires
└─ processQueueIfReady() → executeInput()
└─ Claude sees ← claudemesh: ...
```
### Key requirements (from Claude Code source)
1. **Feature gate**: `feature('KAIROS') || feature('KAIROS_CHANNELS')` must be true. `KAIROS_CHANNELS` is external (GrowthBook). `--dangerously-load-development-channels` sets `entry.dev = true` which bypasses the allowlist check but still requires the feature gate.
2. **OAuth auth required**: Channel notifications require `claude.ai` authentication (OAuth tokens). API key users are blocked. This means `claude login --for-claude-ai` must have been run.
3. **Server name must match**: The MCP server's declared name (`new Server({ name: "claudemesh" })`) must match the channel entry from `--dangerously-load-development-channels server:claudemesh`.
4. **Meta keys**: Must match `/^[a-zA-Z_][a-zA-Z0-9_]*$/`. No hyphens. All values must be strings.
5. **Capability declaration**: Server must declare `experimental: { "claude/channel": {} }` in capabilities.
6. **Queue processing is event-driven**: `enqueue()` triggers a React store update → `useEffect` fires → processes immediately. No polling needed on the Claude Code side. The 1s poll timer in claudemesh is for draining the WS push buffer into notifications — Claude Code handles the rest instantly.
### Priority gating on the broker
The broker holds `"next"` and `"low"` priority messages when the peer's status is `"working"`. Only `"now"` messages deliver immediately regardless of status. This is by design — but can cause perceived "push not working" when the hook reports `working` status.
```
Status: idle → delivers: now, next, low
Status: working → delivers: now only
Status: dnd → delivers: now only
```
If a peer appears to not receive messages, check their status in `list_peers`. A peer stuck in `"working"` (e.g., stale hook) will only receive `"now"` priority messages.
### Common issues
| Symptom | Likely cause |
|---------|-------------|
| Messages never arrive | Session started before CLI update — restart with `claudemesh launch` |
| Messages arrive with 5+ minute delay | Peer status stuck on `"working"``next` messages held until idle |
| `← claudemesh:` never appears in idle session | Feature gate `KAIROS_CHANNELS` not enabled, or not OAuth-authenticated |
| Messages arrive only on `check_messages` | Channel handler not registered — check `--dangerously-load-development-channels` flag |
---
## 14. Encryption
### Direct messages

View File

@@ -34,6 +34,7 @@ import {
mesh,
meshFile,
meshFileAccess,
meshFileKey,
meshContext,
meshMember as memberTable,
meshMemory,
@@ -717,6 +718,8 @@ export async function uploadFile(args: {
uploadedByMember?: string;
targetSpec?: string;
expiresAt?: Date;
encrypted?: boolean;
ownerPubkey?: string;
}): Promise<string> {
const [row] = await db
.insert(meshFile)
@@ -732,6 +735,8 @@ export async function uploadFile(args: {
uploadedByMember: args.uploadedByMember ?? null,
targetSpec: args.targetSpec ?? null,
expiresAt: args.expiresAt ?? null,
encrypted: args.encrypted ?? false,
ownerPubkey: args.ownerPubkey ?? null,
})
.returning({ id: meshFile.id });
if (!row) throw new Error("failed to insert file row");
@@ -755,6 +760,8 @@ export async function getFile(
uploadedByName: string | null;
targetSpec: string | null;
uploadedAt: Date;
encrypted: boolean;
ownerPubkey: string | null;
} | null> {
const [row] = await db
.select({
@@ -768,6 +775,8 @@ export async function getFile(
uploadedByName: meshFile.uploadedByName,
targetSpec: meshFile.targetSpec,
uploadedAt: meshFile.uploadedAt,
encrypted: meshFile.encrypted,
ownerPubkey: meshFile.ownerPubkey,
})
.from(meshFile)
.where(
@@ -782,6 +791,8 @@ export async function getFile(
return {
...row,
tags: (row.tags ?? []) as string[],
encrypted: row.encrypted,
ownerPubkey: row.ownerPubkey,
};
}
@@ -801,6 +812,7 @@ export async function listFiles(
uploadedBy: string;
uploadedAt: Date;
persistent: boolean;
encrypted: boolean;
}>
> {
const conditions = [
@@ -822,6 +834,7 @@ export async function listFiles(
uploadedByName: meshFile.uploadedByName,
uploadedAt: meshFile.uploadedAt,
persistent: meshFile.persistent,
encrypted: meshFile.encrypted,
})
.from(meshFile)
.where(and(...conditions))
@@ -835,6 +848,7 @@ export async function listFiles(
uploadedBy: r.uploadedByName ?? "unknown",
uploadedAt: r.uploadedAt,
persistent: r.persistent,
encrypted: r.encrypted,
}));
}
@@ -892,11 +906,62 @@ export async function deleteFile(
);
}
/** Insert encrypted key blobs for a newly uploaded E2E file. */
export async function insertFileKeys(
fileId: string,
keys: Array<{ peerPubkey: string; sealedKey: string; grantedByPubkey?: string }>,
): Promise<void> {
if (keys.length === 0) return;
await db.insert(meshFileKey).values(
keys.map((k) => ({
fileId,
peerPubkey: k.peerPubkey,
sealedKey: k.sealedKey,
grantedByPubkey: k.grantedByPubkey ?? null,
})),
);
}
/** Get the sealed key for a specific peer, or null if not authorized. */
export async function getFileKey(
fileId: string,
peerPubkey: string,
): Promise<string | null> {
const [row] = await db
.select({ sealedKey: meshFileKey.sealedKey })
.from(meshFileKey)
.where(
and(eq(meshFileKey.fileId, fileId), eq(meshFileKey.peerPubkey, peerPubkey)),
);
return row?.sealedKey ?? null;
}
/** Grant a peer access to an encrypted file (upsert their key blob). */
export async function grantFileKey(
fileId: string,
peerPubkey: string,
sealedKey: string,
grantedByPubkey: string,
): Promise<void> {
await db
.insert(meshFileKey)
.values({ fileId, peerPubkey, sealedKey, grantedByPubkey })
.onConflictDoUpdate({
target: [meshFileKey.fileId, meshFileKey.peerPubkey],
set: { sealedKey, grantedByPubkey, grantedAt: new Date() },
});
}
// --- Context sharing ---
/**
* Upsert a context snapshot for a peer. Each (meshId, presenceId) pair
* has at most one context row — repeated calls update it in place.
* Upsert a context snapshot for a peer. When `memberId` is provided the
* row is keyed on (meshId, memberId) — a stable identifier that survives
* reconnects. This prevents stale rows from accumulating every time a
* session reconnects with a fresh ephemeral presenceId.
*
* Falls back to (meshId, presenceId) lookup when memberId is absent
* (e.g. legacy callers or anonymous connections).
*/
export async function shareContext(
meshId: string,
@@ -906,24 +971,27 @@ export async function shareContext(
filesRead?: string[],
keyFindings?: string[],
tags?: string[],
memberId?: string,
): Promise<string> {
const now = new Date();
// Try to find existing context for this presence in this mesh.
// Build the WHERE clause: prefer stable memberId, fall back to presenceId.
const lookupWhere = memberId
? and(eq(meshContext.meshId, meshId), eq(meshContext.memberId, memberId))
: and(eq(meshContext.meshId, meshId), eq(meshContext.presenceId, presenceId));
const [existing] = await db
.select({ id: meshContext.id })
.from(meshContext)
.where(
and(
eq(meshContext.meshId, meshId),
eq(meshContext.presenceId, presenceId),
),
)
.where(lookupWhere)
.limit(1);
if (existing) {
await db
.update(meshContext)
.set({
// Keep presenceId current so it reflects the latest connection.
presenceId,
peerName: peerName ?? null,
summary,
filesRead: filesRead ?? [],
@@ -939,6 +1007,7 @@ export async function shareContext(
.insert(meshContext)
.values({
meshId,
memberId: memberId ?? null,
presenceId,
peerName: peerName ?? null,
summary,
@@ -1188,16 +1257,22 @@ export async function createStream(
name: string,
createdByName: string,
): Promise<string> {
const existing = await db
// Atomic upsert: INSERT ... ON CONFLICT DO NOTHING to avoid TOCTOU race
// when two callers concurrently attempt to create the same stream.
const [inserted] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.onConflictDoNothing()
.returning({ id: meshStream.id });
if (inserted) return inserted.id;
// Row already existed — fetch the id.
const [existing] = await db
.select({ id: meshStream.id })
.from(meshStream)
.where(and(eq(meshStream.meshId, meshId), eq(meshStream.name, name)));
if (existing.length > 0) return existing[0]!.id;
const [row] = await db
.insert(meshStream)
.values({ meshId, name, createdByName })
.returning({ id: meshStream.id });
return row!.id;
return existing!.id;
}
/**
@@ -1302,11 +1377,28 @@ export async function drainForMember(
);
// Build group target matching: @all (broadcast alias) + @<groupname>
// for each group the peer belongs to.
// for each group the peer belongs to, expanded to all ancestor paths.
//
// Hierarchical routing (downward propagation):
// A peer in "flexicar/core" also matches messages sent to "@flexicar".
// A peer in "flexicar/core/backend" matches "@flexicar/core" and "@flexicar".
// This lets leads send to a parent group and reach all sub-teams.
//
// Resolution happens at drain time (pull model) — no duplicates stored,
// no schema changes, fully backward-compatible.
const groupTargets = ["@all"];
if (memberGroups) {
const seen = new Set<string>();
for (const g of memberGroups) {
groupTargets.push(`@${g}`);
const parts = g.split("/");
// Add the group itself + every ancestor prefix.
for (let depth = parts.length; depth > 0; depth--) {
const ancestor = parts.slice(0, depth).join("/");
if (!seen.has(ancestor)) {
seen.add(ancestor);
groupTargets.push(`@${ancestor}`);
}
}
}
}
const groupTargetList = sql.raw(
@@ -1337,7 +1429,7 @@ export async function drainForMember(
AND delivered_at IS NULL
AND priority::text IN (${priorityList})
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``} OR target_spec IN (${groupTargetList}))
${excludeSenderSessionPubkey ? sql`AND (sender_session_pubkey IS NULL OR sender_session_pubkey != ${excludeSenderSessionPubkey})` : sql``}
${excludeSenderSessionPubkey ? sql`AND NOT (target_spec IN ('*') AND sender_session_pubkey = ${excludeSenderSessionPubkey})` : sql``}
ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED
)

View File

@@ -23,7 +23,7 @@ const envSchema = z.object({
MINIO_ENDPOINT: z.string().default("minio:9000"),
MINIO_ACCESS_KEY: z.string().default("claudemesh"),
MINIO_SECRET_KEY: z.string().default("changeme"),
MINIO_USE_SSL: z.coerce.boolean().default(false),
MINIO_USE_SSL: z.enum(["true", "false", ""]).transform(v => v === "true").default("false"),
QDRANT_URL: z.string().default("http://qdrant:6333"),
NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"),

View File

@@ -31,10 +31,13 @@ import {
forgetMemory,
getContext,
getFile,
getFileKey,
getFileStatus,
getState,
grantFileKey,
handleHookSetStatus,
heartbeat,
insertFileKeys,
joinGroup,
joinMesh,
leaveGroup,
@@ -123,7 +126,10 @@ async function maybePushQueuedMessages(
excludeSenderSessionPubkey?: string,
): Promise<void> {
const conn = connections.get(presenceId);
if (!conn) return;
if (!conn) {
log.debug("maybePush: no connection for presence", { presence_id: presenceId });
return;
}
const status = await refreshStatusFromJsonl(
presenceId,
conn.cwd,
@@ -138,6 +144,13 @@ async function maybePushQueuedMessages(
excludeSenderSessionPubkey,
conn.groups.map((g) => g.name),
);
log.info("maybePush", {
presence_id: presenceId,
status,
session_pubkey: conn.sessionPubkey?.slice(0, 12),
exclude: excludeSenderSessionPubkey?.slice(0, 12),
drained: messages.length,
});
for (const m of messages) {
const push: WSPushMessage = {
type: "push",
@@ -368,6 +381,9 @@ function handleUploadPost(
const tagsRaw = req.headers["x-tags"] as string | undefined;
const persistentRaw = req.headers["x-persistent"] as string | undefined;
const targetSpec = req.headers["x-target-spec"] as string | undefined;
const encryptedRaw = req.headers["x-encrypted"] as string | undefined;
const ownerPubkey = req.headers["x-owner-pubkey"] as string | undefined;
const fileKeysRaw = req.headers["x-file-keys"] as string | undefined;
if (!meshId || !memberId || !fileName) {
writeJson(res, 400, {
@@ -435,19 +451,44 @@ function handleUploadPost(
: undefined,
);
// Insert DB row
// Insert DB row — normalise tags to a real JS Array (Drizzle PgArray
// mapper calls .map() on the value; non-Array iterables break it).
// Skip uploadedByMember FK — memberId from the client header is the
// mesh slug, not a mesh.member primary key.
const encrypted = encryptedRaw === "true";
let fileKeys: Array<{ peerPubkey: string; sealedKey: string }> = [];
if (encrypted && fileKeysRaw) {
try {
fileKeys = JSON.parse(fileKeysRaw);
} catch { /* ignore */ }
}
const dbFileId = await uploadFile({
meshId,
name: fileName,
sizeBytes: body.length,
mimeType: (req.headers["content-type"] as string) || undefined,
minioKey,
tags,
tags: Array.isArray(tags) ? tags : [],
persistent,
uploadedByMember: memberId,
uploadedByName: memberId || undefined,
uploadedByMember: undefined,
targetSpec: targetSpec || undefined,
encrypted: encrypted || false,
ownerPubkey: ownerPubkey || undefined,
});
if (encrypted && fileKeys.length > 0) {
await insertFileKeys(
dbFileId,
fileKeys.map((k) => ({
peerPubkey: k.peerPubkey,
sealedKey: k.sealedKey,
grantedByPubkey: ownerPubkey,
})),
);
}
writeJson(res, 200, { ok: true, fileId: dbFileId });
log.info("upload", {
route: "POST /upload",
@@ -506,8 +547,9 @@ function sendError(
code: string,
message: string,
id?: string,
reqId?: string,
): void {
const err: WSServerMessage = { type: "error", code, message, id };
const err: WSServerMessage = { type: "error", code, message, id, ...(reqId ? { _reqId: reqId } : {}) };
try {
ws.send(JSON.stringify(err));
} catch {
@@ -686,6 +728,7 @@ function handleConnection(ws: WebSocket): void {
ws.on("message", async (raw) => {
try {
const msg = JSON.parse(raw.toString()) as WSClientMessage;
const _reqId = (msg as any)._reqId as string | undefined;
if (msg.type === "hello") {
const result = await handleHello(ws, msg);
if (!result) return;
@@ -735,6 +778,7 @@ function handleConnection(ws: WebSocket): void {
sessionId: p.sessionId,
connectedAt: p.connectedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
};
conn.ws.send(JSON.stringify(resp));
log.info("ws list_peers", {
@@ -803,14 +847,8 @@ function handleConnection(ws: WebSocket): void {
updatedBy: stateRow.updatedBy,
});
}
// Send confirmation back to sender as state_result.
sendToPeer(presenceId, {
type: "state_result",
key: stateRow.key,
value: stateRow.value,
updatedBy: stateRow.updatedBy,
updatedAt: stateRow.updatedAt.toISOString(),
});
// Fire-and-forget: no state_result sent back to sender.
// The client (server.ts) returns success immediately without waiting.
log.info("ws set_state", {
presence_id: presenceId,
key: ss.key,
@@ -827,6 +865,7 @@ function handleConnection(ws: WebSocket): void {
value: stateEntry.value,
updatedBy: stateEntry.updatedBy,
updatedAt: stateEntry.updatedAt.toISOString(),
...(_reqId ? { _reqId } : {}),
});
} else {
sendToPeer(presenceId, {
@@ -835,6 +874,7 @@ function handleConnection(ws: WebSocket): void {
value: null,
updatedBy: "",
updatedAt: "",
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws get_state", {
@@ -854,6 +894,7 @@ function handleConnection(ws: WebSocket): void {
updatedBy: e.updatedBy,
updatedAt: e.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_state", {
presence_id: presenceId,
@@ -876,6 +917,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "memory_stored",
id: memoryId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws remember", {
presence_id: presenceId,
@@ -895,6 +937,7 @@ function handleConnection(ws: WebSocket): void {
rememberedBy: m.rememberedBy,
rememberedAt: m.rememberedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws recall", {
presence_id: presenceId,
@@ -911,6 +954,7 @@ function handleConnection(ws: WebSocket): void {
id: fg.memoryId,
messageId: fg.memoryId,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws forget", {
presence_id: presenceId,
@@ -922,7 +966,7 @@ function handleConnection(ws: WebSocket): void {
const gf = msg as Extract<WSClientMessage, { type: "get_file" }>;
const file = await getFile(conn.meshId, gf.fileId);
if (!file) {
sendError(conn.ws, "not_found", "file not found");
sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break;
}
// Access control: if targetSpec is set, verify peer matches
@@ -932,7 +976,20 @@ function handleConnection(ws: WebSocket): void {
file.targetSpec === conn.sessionPubkey ||
file.targetSpec === "*";
if (!matches) {
sendError(conn.ws, "forbidden", "file not targeted at you");
sendError(conn.ws, "forbidden", "file not targeted at you", undefined, _reqId);
break;
}
}
// E2E: for encrypted files, fetch the sealed key for this peer.
// Owners are not blocked if their key is missing (edge case), but
// they still get it returned so the CLI can decrypt normally.
let sealedKey: string | null = null;
if (file.encrypted) {
const peerPubkey = conn.sessionPubkey ?? conn.memberPubkey;
const isOwner = !!(file.ownerPubkey && peerPubkey === file.ownerPubkey);
sealedKey = peerPubkey ? await getFileKey(gf.fileId, peerPubkey) : null;
if (!sealedKey && !isOwner) {
sendError(conn.ws, "forbidden", "no decryption key for this file", undefined, _reqId);
break;
}
}
@@ -957,6 +1014,9 @@ function handleConnection(ws: WebSocket): void {
fileId: gf.fileId,
url: presignedUrl,
name: file.name,
encrypted: file.encrypted,
sealedKey: sealedKey ?? undefined,
...(_reqId ? { _reqId } : {}),
});
log.info("ws get_file", {
presence_id: presenceId,
@@ -977,7 +1037,9 @@ function handleConnection(ws: WebSocket): void {
uploadedBy: f.uploadedBy,
uploadedAt: f.uploadedAt.toISOString(),
persistent: f.persistent,
encrypted: f.encrypted,
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_files", {
presence_id: presenceId,
@@ -996,6 +1058,7 @@ function handleConnection(ws: WebSocket): void {
peerName: a.peerName,
accessedAt: a.accessedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws file_status", {
presence_id: presenceId,
@@ -1003,6 +1066,23 @@ function handleConnection(ws: WebSocket): void {
});
break;
}
case "grant_file_access": {
const gfa = msg as { type: "grant_file_access"; fileId: string; peerPubkey: string; sealedKey: string };
const file = await getFile(conn.meshId, gfa.fileId);
if (!file) {
sendError(conn.ws, "not_found", "file not found", undefined, _reqId);
break;
}
const requestorPubkey = conn.sessionPubkey ?? conn.memberPubkey;
if (file.ownerPubkey && file.ownerPubkey !== requestorPubkey) {
sendError(conn.ws, "forbidden", "only the file owner can grant access", undefined, _reqId);
break;
}
await grantFileKey(gfa.fileId, gfa.peerPubkey, gfa.sealedKey, requestorPubkey ?? undefined);
sendToPeer(presenceId, { type: "grant_file_access_ok", fileId: gfa.fileId, peerPubkey: gfa.peerPubkey, ...(_reqId ? { _reqId } : {}) });
log.info("ws grant_file_access", { presence_id: presenceId, file_id: gfa.fileId, peer: gfa.peerPubkey });
break;
}
case "delete_file": {
const df = msg as Extract<WSClientMessage, { type: "delete_file" }>;
await deleteFile(conn.meshId, df.fileId);
@@ -1011,6 +1091,7 @@ function handleConnection(ws: WebSocket): void {
id: df.fileId,
messageId: df.fileId,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws delete_file", {
presence_id: presenceId,
@@ -1031,7 +1112,7 @@ function handleConnection(ws: WebSocket): void {
.from(messageQueue)
.where(eq(messageQueue.id, ms.messageId));
if (!mqRow || mqRow.meshId !== conn.meshId) {
sendError(conn.ws, "not_found", "message not found");
sendError(conn.ws, "not_found", "message not found", undefined, _reqId);
break;
}
// Build per-recipient status from connected peers.
@@ -1075,6 +1156,7 @@ function handleConnection(ws: WebSocket): void {
delivered: !!mqRow.deliveredAt,
deliveredAt: mqRow.deliveredAt?.toISOString() ?? null,
recipients,
...(_reqId ? { _reqId } : {}),
};
sendToPeer(presenceId, resp);
log.info("ws message_status", {
@@ -1097,6 +1179,7 @@ function handleConnection(ws: WebSocket): void {
sc.filesRead,
sc.keyFindings,
sc.tags,
conn.memberId,
);
sendToPeer(presenceId, {
type: "context_shared",
@@ -1132,6 +1215,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws get_context", {
presence_id: presenceId,
@@ -1150,6 +1234,7 @@ function handleConnection(ws: WebSocket): void {
tags: c.tags,
updatedAt: c.updatedAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_contexts", {
presence_id: presenceId,
@@ -1174,6 +1259,7 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "task_created",
id: taskId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws create_task", {
presence_id: presenceId,
@@ -1194,7 +1280,7 @@ function handleConnection(ws: WebSocket): void {
memberInfo?.displayName,
);
if (!claimed) {
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist");
sendError(conn.ws, "task_not_claimable", "task is not open or does not exist", undefined, _reqId);
break;
}
// Return updated task list so caller sees the change.
@@ -1212,6 +1298,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws claim_task", {
presence_id: presenceId,
@@ -1227,7 +1314,7 @@ function handleConnection(ws: WebSocket): void {
cpt.result,
);
if (!completed) {
sendError(conn.ws, "task_not_found", "task not found in this mesh");
sendError(conn.ws, "task_not_found", "task not found in this mesh", undefined, _reqId);
break;
}
// Return updated task list.
@@ -1245,6 +1332,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws complete_task", {
presence_id: presenceId,
@@ -1268,6 +1356,7 @@ function handleConnection(ws: WebSocket): void {
tags: t.tags,
createdAt: t.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_tasks", {
presence_id: presenceId,
@@ -1293,6 +1382,7 @@ function handleConnection(ws: WebSocket): void {
type: "stream_created",
id: streamId,
name: cs.name,
...(_reqId ? { _reqId } : {}),
});
log.info("ws create_stream", {
presence_id: presenceId,
@@ -1307,6 +1397,7 @@ function handleConnection(ws: WebSocket): void {
if (!streamSubscriptions.has(key))
streamSubscriptions.set(key, new Set());
streamSubscriptions.get(key)!.add(presenceId);
sendToPeer(presenceId, { type: "subscribed", stream: sub.stream, ...(_reqId ? { _reqId } : {}) });
log.info("ws subscribe", {
presence_id: presenceId,
stream: sub.stream,
@@ -1365,6 +1456,7 @@ function handleConnection(ws: WebSocket): void {
subscriberCount: streamSubscriptions.get(key)?.size ?? 0,
};
}),
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_streams", {
presence_id: presenceId,
@@ -1379,40 +1471,43 @@ function handleConnection(ws: WebSocket): void {
case "vector_store": {
const vs = msg as Extract<WSClientMessage, { type: "vector_store" }>;
const collName = meshCollectionName(conn.meshId, vs.collection);
await ensureCollection(collName);
const { generateId } = await import("@turbostarter/shared/utils");
const pointId = generateId();
// Store text + metadata as payload. Use a zero vector as placeholder
// — real embeddings should be computed client-side and sent directly
// to Qdrant in a future version.
const zeroVector = new Array(1536).fill(0) as number[];
await qdrant.upsert(collName, {
wait: true,
points: [
{
id: pointId,
vector: zeroVector,
payload: {
text: vs.text,
mesh_id: conn.meshId,
stored_by: conn.memberPubkey,
stored_at: new Date().toISOString(),
...(vs.metadata ?? {}),
try {
await ensureCollection(collName);
const { generateId } = await import("@turbostarter/shared/utils");
const pointId = generateId();
// Store text + metadata as payload. Use a zero vector as placeholder
// — real embeddings should be computed client-side and sent directly
// to Qdrant in a future version.
const zeroVector = new Array(1536).fill(0) as number[];
await qdrant.upsert(collName, {
wait: true,
points: [
{
id: pointId,
vector: zeroVector,
payload: {
text: vs.text,
mesh_id: conn.meshId,
stored_by: conn.memberPubkey,
stored_at: new Date().toISOString(),
...(vs.metadata ?? {}),
},
},
},
],
});
sendToPeer(presenceId, {
type: "ack" as const,
id: pointId,
messageId: pointId,
queued: false,
});
log.info("ws vector_store", {
presence_id: presenceId,
collection: vs.collection,
point_id: pointId,
});
],
});
sendToPeer(presenceId, {
type: "vector_stored",
id: pointId,
...(_reqId ? { _reqId } : {}),
});
log.info("ws vector_store", {
presence_id: presenceId,
collection: vs.collection,
point_id: pointId,
});
} catch (e) {
sendError(conn.ws, "vector_error", e instanceof Error ? e.message : String(e), undefined, _reqId);
}
break;
}
case "vector_search": {
@@ -1446,12 +1541,14 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "vector_results",
results: matches,
...(_reqId ? { _reqId } : {}),
});
} catch {
// Collection may not exist yet — return empty results.
sendToPeer(presenceId, {
type: "vector_results",
results: [],
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws vector_search", {
@@ -1477,6 +1574,7 @@ function handleConnection(ws: WebSocket): void {
id: vd.id,
messageId: vd.id,
queued: false,
...(_reqId ? { _reqId } : {}),
});
log.info("ws vector_delete", {
presence_id: presenceId,
@@ -1496,11 +1594,13 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "collection_list",
collections: meshCollections,
...(_reqId ? { _reqId } : {}),
});
} catch {
sendToPeer(presenceId, {
type: "collection_list",
collections: [],
...(_reqId ? { _reqId } : {}),
});
}
log.info("ws list_collections", {
@@ -1535,9 +1635,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "graph_result",
records: gqRecords,
...(_reqId ? { _reqId } : {}),
});
} catch (gqErr) {
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr));
sendError(conn.ws, "graph_error", gqErr instanceof Error ? gqErr.message : String(gqErr), undefined, _reqId);
} finally {
await gqSession.close();
}
@@ -1569,9 +1670,10 @@ function handleConnection(ws: WebSocket): void {
sendToPeer(presenceId, {
type: "graph_result",
records: geRecords,
...(_reqId ? { _reqId } : {}),
});
} catch (geErr) {
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr));
sendError(conn.ws, "graph_error", geErr instanceof Error ? geErr.message : String(geErr), undefined, _reqId);
} finally {
await geSession.close();
}
@@ -1588,12 +1690,14 @@ function handleConnection(ws: WebSocket): void {
const mq = msg as Extract<WSClientMessage, { type: "mesh_query" }>;
try {
const result = await meshQuery(conn.meshId, mq.sql);
sendToPeer(presenceId, { type: "mesh_query_result", ...result });
sendToPeer(presenceId, { type: "mesh_query_result", ...result, ...(_reqId ? { _reqId } : {}) });
} catch (e) {
sendError(
conn.ws,
"query_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_query", {
@@ -1611,12 +1715,15 @@ function handleConnection(ws: WebSocket): void {
columns: [],
rows: [],
rowCount: result.rowCount,
...(_reqId ? { _reqId } : {}),
});
} catch (e) {
sendError(
conn.ws,
"execute_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_execute", {
@@ -1628,12 +1735,14 @@ function handleConnection(ws: WebSocket): void {
case "mesh_schema": {
try {
const tables = await meshSchema(conn.meshId);
sendToPeer(presenceId, { type: "mesh_schema_result", tables });
sendToPeer(presenceId, { type: "mesh_schema_result", tables, ...(_reqId ? { _reqId } : {}) });
} catch (e) {
sendError(
conn.ws,
"schema_error",
e instanceof Error ? e.message : String(e),
undefined,
_reqId,
);
}
log.info("ws mesh_schema", { presence_id: presenceId });
@@ -1656,8 +1765,10 @@ function handleConnection(ws: WebSocket): void {
]);
const allGroups = new Set<string>();
for (const p of peers) for (const g of p.groups) allGroups.add(`@${g.name}`);
const myPresence = peers.find(p => p.sessionId === [...connections.entries()].find(([pid]) => pid === presenceId)?.[1]?.sessionPubkey);
const peerConn = connections.get(presenceId);
// Find own display name: match sessionPubkey from the peer list
const selfPubkey = peerConn?.sessionPubkey ?? peerConn?.memberPubkey;
const selfPeer = peers.find(p => p.pubkey === selfPubkey);
sendToPeer(presenceId, {
type: "mesh_info_result",
mesh: conn.meshId,
@@ -1670,8 +1781,9 @@ function handleConnection(ws: WebSocket): void {
streams: streams.map(s => s.name),
tables: tables.map((t: any) => t.name),
collections: [],
yourName: peerConn?.groups?.[0]?.name ?? "unknown",
yourName: selfPeer?.displayName ?? "unknown",
yourGroups: peerConn?.groups ?? [],
...(_reqId ? { _reqId } : {}),
});
log.info("ws mesh_info", { presence_id: presenceId });
break;

View File

@@ -161,6 +161,7 @@ export interface WSAckMessage {
id: string; // echoes client-side correlation id
messageId: string;
queued: boolean;
_reqId?: string;
}
/** Broker → client: hello handshake acknowledgement. */
@@ -182,6 +183,7 @@ export interface WSPeersListMessage {
sessionId: string;
connectedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: a state key was changed by another peer. */
@@ -199,6 +201,7 @@ export interface WSStateResultMessage {
value: unknown;
updatedAt: string;
updatedBy: string;
_reqId?: string;
}
/** Broker → client: response to list_state. */
@@ -210,12 +213,14 @@ export interface WSStateListMessage {
updatedBy: string;
updatedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: acknowledgement for a remember. */
export interface WSMemoryStoredMessage {
type: "memory_stored";
id: string;
_reqId?: string;
}
/** Broker → client: response to recall. */
@@ -228,6 +233,7 @@ export interface WSMemoryResultsMessage {
rememberedBy: string;
rememberedAt: string;
}>;
_reqId?: string;
}
// --- Vector storage messages ---
@@ -295,6 +301,13 @@ export interface WSMeshSchemaMessage {
// --- Vector/Graph response messages ---
/** Broker → client: confirmation that a vector point was stored. */
export interface WSVectorStoredMessage {
type: "vector_stored";
id: string;
_reqId?: string;
}
/** Broker → client: vector search results. */
export interface WSVectorResultsMessage {
type: "vector_results";
@@ -304,18 +317,21 @@ export interface WSVectorResultsMessage {
score: number;
metadata?: Record<string, unknown>;
}>;
_reqId?: string;
}
/** Broker → client: list of vector collections. */
export interface WSCollectionListMessage {
type: "collection_list";
collections: string[];
_reqId?: string;
}
/** Broker → client: graph query results. */
export interface WSGraphResultMessage {
type: "graph_result";
records: Array<Record<string, unknown>>;
_reqId?: string;
}
/** Broker → client: mesh SQL query results. */
@@ -324,6 +340,7 @@ export interface WSMeshQueryResultMessage {
columns: string[];
rows: Array<Record<string, unknown>>;
rowCount: number;
_reqId?: string;
}
/** Broker → client: mesh schema introspection results. */
@@ -333,6 +350,7 @@ export interface WSMeshSchemaResultMessage {
name: string;
columns: Array<{ name: string; type: string; nullable: boolean }>;
}>;
_reqId?: string;
}
/** Client → broker: get full mesh overview. */
@@ -355,6 +373,7 @@ export interface WSMeshInfoResultMessage {
collections: string[];
yourName: string;
yourGroups: Array<{ name: string; role?: string }>;
_reqId?: string;
}
/** Client → broker: check delivery status of a message. */
@@ -375,6 +394,7 @@ export interface WSMessageStatusResultMessage {
pubkey: string;
status: "delivered" | "held" | "disconnected";
}>;
_reqId?: string;
}
// --- File sharing messages ---
@@ -404,12 +424,23 @@ export interface WSDeleteFileMessage {
fileId: string;
}
/** Client → broker: grant a peer access to an encrypted file. */
export interface WSGrantFileAccessMessage {
type: "grant_file_access";
fileId: string;
peerPubkey: string;
sealedKey: string;
}
/** Broker → client: presigned URL for downloading a file. */
export interface WSFileUrlMessage {
type: "file_url";
fileId: string;
url: string;
name: string;
encrypted?: boolean;
sealedKey?: string;
_reqId?: string;
}
/** Broker → client: list of files in the mesh. */
@@ -423,7 +454,17 @@ export interface WSFileListMessage {
uploadedBy: string;
uploadedAt: string;
persistent: boolean;
encrypted: boolean;
}>;
_reqId?: string;
}
/** Broker → client: acknowledgement for grant_file_access. */
export interface WSGrantFileAccessOkMessage {
type: "grant_file_access_ok";
fileId: string;
peerPubkey: string;
_reqId?: string;
}
/** Broker → client: access log for a file. */
@@ -434,6 +475,7 @@ export interface WSFileStatusResultMessage {
peerName: string;
accessedAt: string;
}>;
_reqId?: string;
}
// --- Context sharing messages ---
@@ -475,6 +517,7 @@ export interface WSContextResultsMessage {
tags: string[];
updatedAt: string;
}>;
_reqId?: string;
}
/** Broker → client: response to list_contexts. */
@@ -486,6 +529,7 @@ export interface WSContextListMessage {
tags: string[];
updatedAt: string;
}>;
_reqId?: string;
}
// --- Task messages ---
@@ -523,6 +567,7 @@ export interface WSListTasksMessage {
export interface WSTaskCreatedMessage {
type: "task_created";
id: string;
_reqId?: string;
}
/** Broker → client: response to list_tasks, claim_task, complete_task. */
@@ -539,6 +584,7 @@ export interface WSTaskListMessage {
tags: string[];
createdAt: string;
}>;
_reqId?: string;
}
// --- Stream messages ---
@@ -578,6 +624,7 @@ export interface WSStreamCreatedMessage {
type: "stream_created";
id: string;
name: string;
_reqId?: string;
}
/** Broker → client: real-time data pushed from a stream. */
@@ -588,6 +635,13 @@ export interface WSStreamDataMessage {
publishedBy: string;
}
/** Broker → client: confirmation that a stream subscription was registered. */
export interface WSSubscribedMessage {
type: "subscribed";
stream: string;
_reqId?: string;
}
/** Broker → client: response to list_streams. */
export interface WSStreamListMessage {
type: "stream_list";
@@ -598,6 +652,7 @@ export interface WSStreamListMessage {
createdAt: string;
subscriberCount: number;
}>;
_reqId?: string;
}
/** Broker → client: structured error. */
@@ -606,6 +661,7 @@ export interface WSErrorMessage {
code: string;
message: string;
id?: string;
_reqId?: string;
}
export type WSClientMessage =
@@ -627,6 +683,7 @@ export type WSClientMessage =
| WSListFilesMessage
| WSFileStatusMessage
| WSDeleteFileMessage
| WSGrantFileAccessMessage
| WSShareContextMessage
| WSGetContextMessage
| WSListContextsMessage
@@ -664,11 +721,13 @@ export type WSServerMessage =
| WSFileUrlMessage
| WSFileListMessage
| WSFileStatusResultMessage
| WSGrantFileAccessOkMessage
| WSContextSharedMessage
| WSContextResultsMessage
| WSContextListMessage
| WSTaskCreatedMessage
| WSTaskListMessage
| WSVectorStoredMessage
| WSVectorResultsMessage
| WSCollectionListMessage
| WSGraphResultMessage
@@ -676,6 +735,7 @@ export type WSServerMessage =
| WSMeshSchemaResultMessage
| WSStreamCreatedMessage
| WSStreamDataMessage
| WSSubscribedMessage
| WSStreamListMessage
| WSMeshInfoResultMessage
| WSErrorMessage;

View File

@@ -1,6 +1,6 @@
{
"name": "claudemesh-cli",
"version": "0.5.3",
"version": "0.6.6",
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
"keywords": [
"claude-code",
@@ -47,6 +47,7 @@
},
"dependencies": {
"@modelcontextprotocol/sdk": "1.27.1",
"citty": "0.2.2",
"libsodium-wrappers": "0.7.15",
"ws": "8.20.0",
"zod": "4.1.13"

View File

@@ -1,9 +1,12 @@
/**
* `claudemesh launch` — spawn `claude` with peer mesh identity.
*
* Flags are defined in index.ts (citty command) — that is the source of
* truth. This file receives already-parsed flags and rawArgs.
*
* Flow:
* 1. Parse --name, --join, --mesh, --quiet flags
* 2. If --join: run join flow first (accepts token or URL)
* 1. Receive parsed flags from citty + rawArgs for -- passthrough
* 2. If --join: run join flow first
* 3. Load config → pick mesh (auto if 1, interactive picker if >1)
* 4. Write per-session config to tmpdir (isolates mesh selection)
* 5. Spawn claude with CLAUDEMESH_CONFIG_DIR + CLAUDEMESH_DISPLAY_NAME
@@ -18,73 +21,17 @@ import { createInterface } from "node:readline";
import { loadConfig, getConfigPath } from "../state/config";
import type { Config, JoinedMesh, GroupEntry } from "../state/config";
// --- Arg parsing ---
interface LaunchArgs {
name: string | null;
role: string | null;
groups: string | null; // comma-separated, e.g. "frontend:lead,reviewers:member"
joinLink: string | null;
meshSlug: string | null;
messageMode: "push" | "inbox" | "off" | null;
quiet: boolean;
skipPermConfirm: boolean;
claudeArgs: string[];
}
function parseArgs(argv: string[]): LaunchArgs {
const result: LaunchArgs = {
name: null,
role: null,
groups: null,
joinLink: null,
meshSlug: null,
messageMode: null,
quiet: false,
skipPermConfirm: false,
claudeArgs: [],
};
let i = 0;
while (i < argv.length) {
const arg = argv[i]!;
if (arg === "--name" && i + 1 < argv.length) {
result.name = argv[++i]!;
} else if (arg.startsWith("--name=")) {
result.name = arg.slice("--name=".length);
} else if (arg === "--role" && i + 1 < argv.length) {
result.role = argv[++i]!;
} else if (arg.startsWith("--role=")) {
result.role = arg.slice("--role=".length);
} else if (arg === "--groups" && i + 1 < argv.length) {
result.groups = argv[++i]!;
} else if (arg.startsWith("--groups=")) {
result.groups = arg.slice("--groups=".length);
} else if (arg === "--join" && i + 1 < argv.length) {
result.joinLink = argv[++i]!;
} else if (arg.startsWith("--join=")) {
result.joinLink = arg.slice("--join=".length);
} else if (arg === "--mesh" && i + 1 < argv.length) {
result.meshSlug = argv[++i]!;
} else if (arg.startsWith("--mesh=")) {
result.meshSlug = arg.slice("--mesh=".length);
} else if (arg === "--inbox") {
result.messageMode = "inbox";
} else if (arg === "--no-messages") {
result.messageMode = "off";
} else if (arg === "--quiet") {
result.quiet = true;
} else if (arg === "-y" || arg === "--yes") {
result.skipPermConfirm = true;
} else if (arg === "--") {
result.claudeArgs.push(...argv.slice(i + 1));
break;
} else {
result.claudeArgs.push(arg);
}
i++;
}
return result;
// Flags as parsed by citty (index.ts is the source of truth for definitions).
export interface LaunchFlags {
name?: string;
role?: string;
groups?: string;
join?: string;
mesh?: string;
"message-mode"?: string;
"system-prompt"?: string;
yes?: boolean;
quiet?: boolean;
}
// --- Interactive mesh picker ---
@@ -206,8 +153,26 @@ function printBanner(name: string, meshSlug: string, role: string | null, groups
// --- Main ---
export async function runLaunch(extraArgs: string[]): Promise<void> {
const args = parseArgs(extraArgs);
export async function runLaunch(flags: LaunchFlags, rawArgs: string[]): Promise<void> {
// Extract args that follow "--" — passed straight through to claude.
const dashIdx = rawArgs.indexOf("--");
const claudePassthrough = dashIdx >= 0 ? rawArgs.slice(dashIdx + 1) : [];
// Normalise flags into the internal shape used below.
const args = {
name: flags.name ?? null,
role: flags.role ?? null,
groups: flags.groups ?? null,
joinLink: flags.join ?? null,
meshSlug: flags.mesh ?? null,
messageMode: (["push", "inbox", "off"].includes(flags["message-mode"] ?? "")
? flags["message-mode"] as "push" | "inbox" | "off"
: null),
systemPrompt: flags["system-prompt"] ?? null,
quiet: flags.quiet ?? false,
skipPermConfirm: flags.yes ?? false,
claudeArgs: claudePassthrough,
};
// 1. If --join, run join flow first.
if (args.joinLink) {
@@ -318,6 +283,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
version: 1,
meshes: [mesh],
displayName,
...(role ? { role } : {}),
...(parsedGroups.length > 0 ? { groups: parsedGroups } : {}),
messageMode,
};
@@ -351,6 +317,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
"--dangerously-load-development-channels",
"server:claudemesh",
"--dangerously-skip-permissions",
...(args.systemPrompt ? ["--system-prompt", args.systemPrompt] : []),
...filtered,
];
@@ -362,6 +329,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
...process.env,
CLAUDEMESH_CONFIG_DIR: tmpDir,
CLAUDEMESH_DISPLAY_NAME: displayName,
...(role ? { CLAUDEMESH_ROLE: role } : {}),
},
});

View File

@@ -0,0 +1,90 @@
/**
* File encryption for claudemesh E2E file sharing.
*
* Symmetric: crypto_secretbox_easy with random Kf (32-byte key).
* Key wrapping: crypto_box_seal to recipient's X25519 pub (converted from ed25519).
* Key opening: crypto_box_seal_open with own X25519 keypair.
*/
import { ensureSodium } from "./keypair";
export interface EncryptedFile {
ciphertext: Uint8Array; // secretbox ciphertext (includes MAC)
nonce: string; // base64 24-byte nonce
key: Uint8Array; // 32-byte symmetric Kf (keep in memory only)
}
/**
* Encrypt file bytes with a fresh random symmetric key.
* Returns ciphertext, nonce (base64), and the plaintext Kf.
*/
export async function encryptFile(plaintext: Uint8Array): Promise<EncryptedFile> {
const sodium = await ensureSodium();
const key = sodium.randombytes_buf(sodium.crypto_secretbox_KEYBYTES);
const nonce = sodium.randombytes_buf(sodium.crypto_secretbox_NONCEBYTES);
const ciphertext = sodium.crypto_secretbox_easy(plaintext, nonce, key);
return {
ciphertext,
nonce: sodium.to_base64(nonce, sodium.base64_variants.ORIGINAL),
key,
};
}
/**
* Decrypt file bytes with the symmetric key Kf.
* Returns null if decryption fails.
*/
export async function decryptFile(
ciphertext: Uint8Array,
nonceB64: string,
key: Uint8Array,
): Promise<Uint8Array | null> {
const sodium = await ensureSodium();
try {
const nonce = sodium.from_base64(nonceB64, sodium.base64_variants.ORIGINAL);
return sodium.crypto_secretbox_open_easy(ciphertext, nonce, key);
} catch {
return null;
}
}
/**
* Seal Kf for a recipient using crypto_box_seal (ephemeral sender key).
* recipientPubkeyHex: ed25519 pubkey of recipient (64 hex chars).
* Returns base64 sealed box.
*/
export async function sealKeyForPeer(
kf: Uint8Array,
recipientPubkeyHex: string,
): Promise<string> {
const sodium = await ensureSodium();
const recipientCurve = sodium.crypto_sign_ed25519_pk_to_curve25519(
sodium.from_hex(recipientPubkeyHex),
);
const sealed = sodium.crypto_box_seal(kf, recipientCurve);
return sodium.to_base64(sealed, sodium.base64_variants.ORIGINAL);
}
/**
* Open a sealed key blob using own ed25519 keypair (converted to X25519).
* Returns the 32-byte Kf or null if decryption fails.
*/
export async function openSealedKey(
sealedB64: string,
myPubkeyHex: string,
mySecretKeyHex: string,
): Promise<Uint8Array | null> {
const sodium = await ensureSodium();
try {
const myCurvePub = sodium.crypto_sign_ed25519_pk_to_curve25519(
sodium.from_hex(myPubkeyHex),
);
const myCurveSec = sodium.crypto_sign_ed25519_sk_to_curve25519(
sodium.from_hex(mySecretKeyHex),
);
const sealed = sodium.from_base64(sealedB64, sodium.base64_variants.ORIGINAL);
return sodium.crypto_box_seal_open(sealed, myCurvePub, myCurveSec);
} catch {
return null;
}
}

View File

@@ -1,13 +1,15 @@
/**
* claudemesh-cli entry point.
*
* Uses citty to define commands and flags. --help is generated from
* the command definitions — the flag list here IS the documentation.
*
* Dispatches between two modes:
* - `claudemesh mcp` → MCP server (stdio transport)
* - `claudemesh <subcommand>` → CLI subcommand
*
* Claude Code invokes the `mcp` mode via stdio. Humans use all others.
*/
import { defineCommand, runMain } from "citty";
import { startMcpServer } from "./mcp/server";
import { runInstall, runUninstall } from "./commands/install";
import { runJoin } from "./commands/join";
@@ -21,96 +23,152 @@ import { runDoctor } from "./commands/doctor";
import { runWelcome } from "./commands/welcome";
import { VERSION } from "./version";
const HELP = `claudemesh v${VERSION} — peer mesh for Claude Code sessions
Usage:
claudemesh <command> [args]
Commands:
install Register MCP + Stop/UserPromptSubmit status hooks
(add --no-hooks for bare MCP registration)
uninstall Remove MCP server + hooks
launch [opts] Launch Claude Code with real-time push messages
--name <name> Display name for this session
--mesh <slug> Select mesh (picker if >1, omitted)
--join <url> Join a mesh before launching
--quiet Skip the info banner
-- <args> Pass remaining args to claude
join <url> Join a mesh via https://claudemesh.com/join/... URL
list Show all joined meshes
leave <slug> Leave a joined mesh
status Health report: broker reachability per joined mesh
doctor Diagnostic checks (install, config, keypairs, PATH)
seed-test-mesh Dev-only: inject a mesh into config (skips invite flow)
mcp Start MCP server (stdio) — invoked by Claude Code
--help, -h Show this help
--version, -v Show the CLI version
Environment:
CLAUDEMESH_BROKER_URL Override broker URL (default: wss://ic.claudemesh.com/ws)
CLAUDEMESH_CONFIG_DIR Override config directory (default: ~/.claudemesh/)
CLAUDEMESH_DEBUG=1 Verbose logging
`;
const cmd = process.argv[2];
const args = process.argv.slice(3);
async function main(): Promise<void> {
switch (cmd) {
case "mcp":
await startMcpServer();
return;
case "install":
runInstall(args);
return;
case "uninstall":
runUninstall();
return;
case "hook":
await runHook(args);
return;
case "launch":
await runLaunch(args);
return;
case "join":
await runJoin(args);
return;
case "list":
runList();
return;
case "leave":
runLeave(args);
return;
case "status":
await runStatus();
return;
case "doctor":
await runDoctor();
return;
case "seed-test-mesh":
runSeedTestMesh(args);
return;
case "--version":
case "-v":
case "version":
console.log(VERSION);
return;
case "--help":
case "-h":
case "help":
console.log(HELP);
return;
case undefined:
runWelcome();
return;
default:
console.error(`Unknown command: ${cmd}`);
console.error("Run `claudemesh --help` for usage.");
process.exit(1);
}
}
main().catch((e) => {
console.error(`claudemesh: ${e instanceof Error ? e.message : String(e)}`);
process.exit(1);
const launch = defineCommand({
meta: {
name: "launch",
description: "Launch Claude Code connected to a mesh with real-time peer messaging",
},
args: {
name: {
type: "string",
description: "Display name for this session",
},
role: {
type: "string",
description: "Role tag (dev, lead, analyst — free-form)",
},
groups: {
type: "string",
description: 'Groups to join: "group:role,group2" — colon sets role. Hierarchy via slash: "eng/frontend:lead"',
},
mesh: {
type: "string",
description: "Select mesh by slug (interactive picker if omitted and >1 joined)",
},
join: {
type: "string",
description: "Join a mesh via invite URL before launching",
},
"message-mode": {
type: "string",
description: "push (default) | inbox | off — controls how peer messages are delivered",
},
"system-prompt": {
type: "string",
description: "Set Claude's system prompt for this session",
},
yes: {
type: "boolean",
alias: "y",
description: "Skip permission confirmation",
default: false,
},
quiet: {
type: "boolean",
description: "Skip banner and all interactive prompts",
default: false,
},
},
run({ args, rawArgs }) {
// Forward to the existing launch runner, preserving -- passthrough to claude.
return runLaunch(args, rawArgs);
},
});
const install = defineCommand({
meta: {
name: "install",
description: "Register MCP server + status hooks with Claude Code",
},
args: {
"no-hooks": {
type: "boolean",
description: "Register MCP server only, skip hooks",
default: false,
},
},
run({ rawArgs }) {
runInstall(rawArgs);
},
});
const join = defineCommand({
meta: {
name: "join",
description: "Join a mesh via invite URL",
},
args: {
url: {
type: "positional",
description: "Invite URL (https://claudemesh.com/join/...)",
required: true,
},
},
run({ args }) {
return runJoin([args.url]);
},
});
const leave = defineCommand({
meta: {
name: "leave",
description: "Leave a joined mesh",
},
args: {
slug: {
type: "positional",
description: "Mesh slug to leave",
required: true,
},
},
run({ args }) {
runLeave([args.slug]);
},
});
const main = defineCommand({
meta: {
name: "claudemesh",
version: VERSION,
description: "Peer mesh for Claude Code sessions",
},
subCommands: {
launch,
install,
uninstall: defineCommand({
meta: { name: "uninstall", description: "Remove MCP server and hooks" },
run() { runUninstall(); },
}),
join,
list: defineCommand({
meta: { name: "list", description: "Show joined meshes and identities" },
run() { runList(); },
}),
leave,
status: defineCommand({
meta: { name: "status", description: "Check broker reachability for each joined mesh" },
async run() { await runStatus(); },
}),
doctor: defineCommand({
meta: { name: "doctor", description: "Diagnose install, config, keypairs, and PATH" },
async run() { await runDoctor(); },
}),
mcp: defineCommand({
meta: { name: "mcp", description: "Start MCP server (stdio — invoked by Claude Code, not users)" },
async run() { await startMcpServer(); },
}),
"seed-test-mesh": defineCommand({
meta: { name: "seed-test-mesh", description: "Dev only: inject a mesh into config (skips invite flow)" },
run({ rawArgs }) { runSeedTestMesh(rawArgs); },
}),
hook: defineCommand({
meta: { name: "hook", description: "Internal hook handler (invoked by Claude Code hooks)" },
async run({ rawArgs }) { await runHook(rawArgs); },
}),
},
run() {
runWelcome();
},
});
runMain(main);

View File

@@ -130,6 +130,7 @@ export async function startMcpServer(): Promise<void> {
const config = loadConfig();
const myName = config.displayName ?? "unnamed";
const myRole = config.role ?? process.env.CLAUDEMESH_ROLE ?? null;
const myGroups = (config.groups ?? []).map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ') || "none";
const messageMode = config.messageMode ?? "push";
@@ -141,7 +142,7 @@ export async function startMcpServer(): Promise<void> {
tools: {},
},
instructions: `## Identity
You are "${myName}" — a peer in the claudemesh network. Your groups: ${myGroups}. You are one of several Claude Code sessions connected to the same mesh. No orchestrator exists — peers are equals. Your identity comes from your name and group roles, not from a central authority.
You are "${myName}"${myRole ? ` (${myRole})` : ""} — a peer in the claudemesh network. Your groups: ${myGroups}. You are one of several Claude Code sessions connected to the same mesh. No orchestrator exists — peers are equals. Your identity comes from your name and group roles, not from a central authority.
## Responding to messages
When you receive a <channel source="claudemesh" ...> message, RESPOND IMMEDIATELY. Pause your current task, reply via send_message, then resume. Read from_name, mesh_slug, and priority from the channel attributes. Reply by setting \`to\` to the sender's from_name (display name). Stay in character per your system prompt. Do not ignore low-priority messages — acknowledge them briefly even if you defer action.
@@ -326,9 +327,15 @@ Your message mode is "${messageMode}".
case "message_status": {
const { id } = (args ?? {}) as { id?: string };
if (!id) return text("message_status: `id` required", true);
const client = allClients()[0];
if (!client) return text("message_status: not connected", true);
const result = await client.messageStatus(id);
const clients = allClients();
if (!clients.length) return text("message_status: not connected", true);
// Try each connected mesh client — we don't know which mesh the
// messageId belongs to, so query all and return the first hit.
let result = null;
for (const c of clients) {
result = await c.messageStatus(id);
if (result) break;
}
if (!result) return text(`Message ${id} not found or timed out.`);
const recipientLines = result.recipients.map(
(r: { name: string; pubkey: string; status: string }) =>
@@ -439,17 +446,83 @@ Your message mode is "${messageMode}".
// --- Files ---
case "share_file": {
const { path: filePath, name: fileName, tags } = (args ?? {}) as { path?: string; name?: string; tags?: string[] };
const { path: filePath, name: fileName, tags, to: fileTo } = (args ?? {}) as { path?: string; name?: string; tags?: string[]; to?: string };
if (!filePath) return text("share_file: `path` required", true);
const { existsSync } = await import("node:fs");
if (!existsSync(filePath)) return text(`share_file: file not found: ${filePath}`, true);
const client = allClients()[0];
if (!client) return text("share_file: not connected", true);
const fileId = await client.uploadFile(filePath, client.meshId, client.meshSlug, {
name: fileName, tags, persistent: true,
});
if (!fileId) return text("share_file: upload failed", true);
return text(`Shared: ${fileName ?? filePath} (${fileId})`);
// If 'to' specified, do E2E encryption
if (fileTo) {
const { encryptFile, sealKeyForPeer } = await import("../crypto/file-crypto");
const { readFileSync, writeFileSync, mkdtempSync, unlinkSync, rmdirSync } = await import("node:fs");
const { tmpdir } = await import("node:os");
const { join, basename } = await import("node:path");
// Resolve target peer pubkey
const peers = await client.listPeers();
const targetPeer = peers.find(p => p.pubkey === fileTo || p.displayName === fileTo);
if (!targetPeer) {
return text(`share_file: peer not found: ${fileTo}`, true);
}
// Read and encrypt file
const plaintext = readFileSync(filePath);
const { ciphertext, nonce, key } = await encryptFile(new Uint8Array(plaintext));
// Seal Kf for target peer
const sealedForTarget = await sealKeyForPeer(key, targetPeer.pubkey);
// Seal Kf for ourselves (owner)
const myPubkey = client.getSessionPubkey();
const sealedForSelf = myPubkey ? await sealKeyForPeer(key, myPubkey) : null;
const fileKeys = [
{ peerPubkey: targetPeer.pubkey, sealedKey: sealedForTarget },
...(sealedForSelf && myPubkey ? [{ peerPubkey: myPubkey, sealedKey: sealedForSelf }] : []),
];
// Build combined buffer: nonce (24 bytes) + ciphertext
const { ensureSodium } = await import("../crypto/keypair");
const sodium = await ensureSodium();
const nonceBytes = sodium.from_base64(nonce, sodium.base64_variants.ORIGINAL);
const combined = new Uint8Array(nonceBytes.length + ciphertext.length);
combined.set(nonceBytes, 0);
combined.set(ciphertext, nonceBytes.length);
const baseName = fileName ?? basename(filePath);
const tmpDir = mkdtempSync(join(tmpdir(), "cm-"));
const tmpPath = join(tmpDir, baseName);
writeFileSync(tmpPath, combined);
try {
const fileId = await client.uploadFile(tmpPath, client.meshId, client.meshSlug, {
name: baseName,
tags,
persistent: true,
encrypted: true,
ownerPubkey: myPubkey ?? undefined,
fileKeys,
});
return text(`Shared (E2E encrypted): ${baseName}${targetPeer.displayName} (${fileId})`);
} catch (e) {
return text(`share_file: upload failed — ${e instanceof Error ? e.message : String(e)}`, true);
} finally {
try { unlinkSync(tmpPath); } catch { /* ignore */ }
try { rmdirSync(tmpDir); } catch { /* ignore */ }
}
}
// Plain (unencrypted) upload — existing code
try {
const fileId = await client.uploadFile(filePath, client.meshId, client.meshSlug, {
name: fileName, tags, persistent: true,
});
return text(`Shared: ${fileName ?? filePath} (${fileId})`);
} catch (e) {
return text(`share_file: upload failed — ${e instanceof Error ? e.message : String(e)}`, true);
}
}
case "get_file": {
@@ -459,6 +532,43 @@ Your message mode is "${messageMode}".
if (!client) return text("get_file: not connected", true);
const result = await client.getFile(id);
if (!result) return text(`get_file: file ${id} not found`, true);
if (result.encrypted) {
if (!result.sealedKey) return text("get_file: encrypted file — no decryption key available for your session", true);
const { openSealedKey, decryptFile } = await import("../crypto/file-crypto");
const { ensureSodium } = await import("../crypto/keypair");
const myPubkey = client.getSessionPubkey();
const mySecret = client.getSessionSecretKey();
if (!myPubkey || !mySecret) {
return text("get_file: no session keypair — cannot decrypt", true);
}
const kf = await openSealedKey(result.sealedKey, myPubkey, mySecret);
if (!kf) return text("get_file: failed to open sealed key", true);
// Download file bytes from presigned URL
const resp = await fetch(result.url, { signal: AbortSignal.timeout(30_000) });
if (!resp.ok) return text(`get_file: download failed (${resp.status})`, true);
const buf = new Uint8Array(await resp.arrayBuffer());
// Wire format: first 24 bytes = nonce, rest = ciphertext
const sodium = await ensureSodium();
const NONCE_BYTES = sodium.crypto_secretbox_NONCEBYTES; // 24
const nonce = sodium.to_base64(buf.slice(0, NONCE_BYTES), sodium.base64_variants.ORIGINAL);
const ciphertext = buf.slice(NONCE_BYTES);
const plaintext = await decryptFile(ciphertext, nonce, kf);
if (!plaintext) return text("get_file: decryption failed", true);
const { writeFileSync, mkdirSync } = await import("node:fs");
const { dirname } = await import("node:path");
mkdirSync(dirname(save_to), { recursive: true });
writeFileSync(save_to, plaintext);
return text(`Downloaded and decrypted: ${result.name}${save_to}`);
}
// Unencrypted — existing download logic
const res = await fetch(result.url, { signal: AbortSignal.timeout(30_000) });
if (!res.ok) return text(`get_file: download failed (${res.status})`, true);
const { writeFileSync, mkdirSync } = await import("node:fs");
@@ -707,6 +817,86 @@ Your message mode is "${messageMode}".
return text(lines.join("\n"));
}
case "ping_mesh": {
const { priorities: pingPriorities } = (args ?? {}) as { priorities?: string[] };
const toTest = (pingPriorities ?? ["now", "next"]) as Priority[];
const client = allClients()[0];
if (!client) return text("ping_mesh: not connected", true);
const results: string[] = [];
// Diagnostics: connection state
results.push(`WS status: ${client.status}`);
results.push(`Mesh: ${client.meshSlug}`);
// Check own peer status (explains priority gating)
const peers = await client.listPeers();
const selfPeer = peers.find(p => p.displayName === myName);
results.push(`Your status: ${selfPeer?.status ?? "not found in peer list"}`);
results.push(`Peers online: ${peers.length}`);
results.push(`Push buffer: ${client.pushHistory.length} buffered`);
// Test send→ack latency per priority (doesn't need round-trip)
for (const prio of toTest) {
const sendTime = Date.now();
// Send to a peer if one exists, otherwise broadcast
const target = peers.find(p => p.displayName !== myName);
const sendResult = await client.send(
target?.pubkey ?? "*",
`__ping__ ${prio} from ${myName} at ${new Date().toISOString()}`,
prio,
);
const ackTime = Date.now();
if (!sendResult.ok) {
results.push(`[${prio}] SEND FAILED: ${sendResult.error}`);
} else {
results.push(`[${prio}] send→ack: ${ackTime - sendTime}ms (msgId: ${sendResult.messageId?.slice(0, 12)})`);
if (prio !== "now" && selfPeer?.status === "working") {
results.push(` ⚠ peer status is "working" — broker holds "${prio}" until idle`);
}
}
}
// Check if notification pipeline works
results.push("");
results.push("Pipeline check:");
results.push(` onPush handlers: active`);
results.push(` messageMode: ${messageMode}`);
results.push(` server.notification: ${messageMode === "off" ? "disabled (mode=off)" : "enabled"}`);
return text(results.join("\n"));
}
case "grant_file_access": {
const { fileId, to: grantTo } = (args ?? {}) as { fileId?: string; to?: string };
if (!fileId || !grantTo) return text("grant_file_access: `fileId` and `to` required", true);
const client = allClients()[0];
if (!client) return text("grant_file_access: not connected", true);
const peers = await client.listPeers();
const targetPeer = peers.find(p => p.pubkey === grantTo || p.displayName === grantTo);
if (!targetPeer) return text(`grant_file_access: peer not found: ${grantTo}`, true);
const result = await client.getFile(fileId);
if (!result) return text("grant_file_access: file not found", true);
if (!result.encrypted) return text("grant_file_access: file is not encrypted", true);
if (!result.sealedKey) return text("grant_file_access: no key available (are you the owner?)", true);
const { openSealedKey, sealKeyForPeer } = await import("../crypto/file-crypto");
const myPubkey = client.getSessionPubkey();
const mySecret = client.getSessionSecretKey();
if (!myPubkey || !mySecret) return text("grant_file_access: no session keypair", true);
const kf = await openSealedKey(result.sealedKey, myPubkey, mySecret);
if (!kf) return text("grant_file_access: cannot decrypt your own key", true);
const sealedForPeer = await sealKeyForPeer(kf, targetPeer.pubkey);
const ok = await client.grantFileAccess(fileId, targetPeer.pubkey, sealedForPeer);
if (!ok) return text("grant_file_access: broker did not confirm", true);
return text(`Access granted: ${targetPeer.displayName} can now download file ${fileId}`);
}
default:
return text(`Unknown tool: ${name}`, true);
}
@@ -722,62 +912,56 @@ Your message mode is "${messageMode}".
// any mesh's broker connection becomes a <channel source="claudemesh">
// system reminder injected into Claude Code's context.
for (const client of allClients()) {
// Poll-based push: drain pushBuffer every 1s and emit channel notifications.
// This is the proven approach from claude-intercom. The WS onPush handler
// fires instantly but server.notification() may not flush stdio reliably
// from an async WS callback. Polling on a timer ensures consistent delivery.
if (messageMode !== "off") {
const pushPollTimer = setInterval(async () => {
const buffered = client.drainPushBuffer();
if (buffered.length > 0) {
process.stderr.write(`[claudemesh] poll: ${buffered.length} message(s) to push\n`);
}
for (const msg of buffered) {
const fromPubkey = msg.senderPubkey || "";
const fromName = fromPubkey
? await resolvePeerName(client, fromPubkey)
: "unknown";
// Event-driven push: WS onPush fires immediately when a message arrives.
// Claude Code's setNotificationHandler → enqueue → React useEffect pipeline
// processes notifications instantly (no polling needed on Claude's side).
// The old poll-based approach was an overcorrection — Claude Code source
// confirms event-driven notification processing.
client.onPush(async (msg) => {
if (messageMode === "off") return;
if (messageMode === "inbox") {
try {
await server.notification({
method: "notifications/claude/channel",
params: {
content: `[inbox] New message from ${fromName}. Use check_messages to read.`,
meta: { kind: "inbox_notification", from_name: fromName },
},
});
} catch { /* best effort */ }
continue;
}
const fromPubkey = msg.senderPubkey || "";
const fromName = fromPubkey
? await resolvePeerName(client, fromPubkey)
: "unknown";
// push mode — full content
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
try {
await server.notification({
method: "notifications/claude/channel",
params: {
content,
meta: {
from_id: fromPubkey,
from_name: fromName,
mesh_slug: client.meshSlug,
mesh_id: client.meshId,
priority: msg.priority,
sent_at: msg.createdAt,
delivered_at: msg.receivedAt,
kind: msg.kind,
},
},
});
process.stderr.write(`[claudemesh] pushed: from=${fromName} content=${content.slice(0, 60)}\n`);
} catch (pushErr) {
process.stderr.write(`[claudemesh] push FAILED: ${pushErr}\n`);
}
}
}, 1_000);
pushPollTimer.unref();
}
if (messageMode === "inbox") {
try {
await server.notification({
method: "notifications/claude/channel",
params: {
content: `[inbox] New message from ${fromName}. Use check_messages to read.`,
meta: { kind: "inbox_notification", from_name: fromName },
},
});
} catch { /* best effort */ }
return;
}
// push mode — full content
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
try {
await server.notification({
method: "notifications/claude/channel",
params: {
content,
meta: {
from_id: fromPubkey,
from_name: fromName,
mesh_slug: client.meshSlug,
mesh_id: client.meshId,
priority: msg.priority,
sent_at: msg.createdAt,
delivered_at: msg.receivedAt,
kind: msg.kind,
},
},
});
process.stderr.write(`[claudemesh] pushed: from=${fromName} content=${content.slice(0, 60)}\n`);
} catch (pushErr) {
process.stderr.write(`[claudemesh] push FAILED: ${pushErr}\n`);
}
});
client.onStreamData(async (evt) => {
try {
@@ -812,7 +996,42 @@ Your message mode is "${messageMode}".
});
}
// Welcome notification: give Claude immediate context on connect.
// Triggers Claude to call mesh_info/list_peers without user input.
setTimeout(async () => {
const client = allClients()[0];
if (!client || client.status !== "open") return;
try {
const peers = await client.listPeers();
const peerNames = peers
.filter(p => p.displayName !== myName)
.map(p => p.displayName)
.join(", ") || "none";
await server.notification({
method: "notifications/claude/channel",
params: {
content: `[system] Connected as ${myName} to mesh ${client.meshSlug}. ${peers.length} peer(s) online: ${peerNames}. Call mesh_info for full details or set_summary to announce yourself.`,
meta: { kind: "welcome", mesh_slug: client.meshSlug },
},
});
} catch { /* best effort */ }
}, 3_000); // 3s delay: let WS connect + hello_ack complete first
// Event loop keepalive: Node.js stdout to a pipe is buffered. Without
// periodic event loop activity, stdout.write() from WS callbacks may not
// flush until the next I/O event. This 1s interval keeps the event loop
// ticking so channel notifications flush promptly — same pattern that made
// claude-intercom's push delivery reliable (its 1s HTTP poll had this
// effect as a side effect). The interval does nothing except prevent the
// event loop from settling.
const keepalive = setInterval(() => {
// Intentionally empty — the interval itself keeps the event loop active.
// Do NOT call .unref() — that would defeat the purpose.
}, 1_000);
void keepalive; // suppress unused warning
const shutdown = (): void => {
clearInterval(keepalive);
stopAll();
process.exit(0);
};

View File

@@ -203,7 +203,7 @@ export const TOOLS: Tool[] = [
{
name: "share_file",
description:
"Share a persistent file with the mesh. All current and future peers can access it.",
"Share a persistent file with the mesh. All current and future peers can access it. If `to` is specified, the file is E2E encrypted and only accessible to that peer (and you).",
inputSchema: {
type: "object",
properties: {
@@ -217,6 +217,10 @@ export const TOOLS: Tool[] = [
items: { type: "string" },
description: "Tags for categorization",
},
to: {
type: "string",
description: "Peer display name or pubkey hex — if set, file is E2E encrypted for this peer only",
},
},
required: ["path"],
},
@@ -269,6 +273,18 @@ export const TOOLS: Tool[] = [
required: ["id"],
},
},
{
name: "grant_file_access",
description: "Grant a peer access to an E2E encrypted file you shared. You must be the owner.",
inputSchema: {
type: "object",
properties: {
fileId: { type: "string", description: "File ID" },
to: { type: "string", description: "Peer display name or pubkey hex to grant access to" },
},
required: ["fileId", "to"],
},
},
// --- Vector tools ---
{
@@ -555,4 +571,21 @@ export const TOOLS: Tool[] = [
"Get a complete overview of the mesh: peers, groups, state, memory, files, tasks, streams, tables. Call on session start for full situational awareness.",
inputSchema: { type: "object", properties: {} },
},
// --- Diagnostics ---
{
name: "ping_mesh",
description:
"Send test messages through the full pipeline and measure round-trip timing per priority. Diagnoses push delivery issues.",
inputSchema: {
type: "object",
properties: {
priorities: {
type: "array",
items: { type: "string", enum: ["now", "next", "low"] },
description: "Priorities to test (default: [\"now\", \"next\"])",
},
},
},
},
];

View File

@@ -37,6 +37,7 @@ export interface Config {
version: 1;
meshes: JoinedMesh[];
displayName?: string; // per-session override, written by `claudemesh launch --name`
role?: string; // per-session role tag (display + hello)
groups?: GroupEntry[];
messageMode?: "push" | "inbox" | "off";
}
@@ -54,7 +55,7 @@ export function loadConfig(): Config {
if (!parsed || !Array.isArray(parsed.meshes)) {
return { version: 1, meshes: [] };
}
return { version: 1, meshes: parsed.meshes, displayName: parsed.displayName, groups: parsed.groups, messageMode: parsed.messageMode };
return { version: 1, meshes: parsed.meshes, displayName: parsed.displayName, role: parsed.role, groups: parsed.groups, messageMode: parsed.messageMode };
} catch (e) {
throw new Error(
`Failed to load ${CONFIG_PATH}: ${e instanceof Error ? e.message : String(e)}`,

View File

@@ -75,14 +75,15 @@ export class BrokerClient {
private outbound: Array<() => void> = []; // closures that send once ws is open
private pushHandlers = new Set<PushHandler>();
private pushBuffer: InboundPush[] = [];
private listPeersResolvers: Array<(peers: PeerInfo[]) => void> = [];
private stateResolvers: Array<(result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void> = [];
private stateListResolvers: Array<(entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void> = [];
private memoryStoreResolvers: Array<(id: string | null) => void> = [];
private memoryRecallResolvers: Array<(memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void> = [];
private listPeersResolvers = new Map<string, { resolve: (peers: PeerInfo[]) => void; timer: NodeJS.Timeout }>();
private stateResolvers = new Map<string, { resolve: (result: { key: string; value: unknown; updatedBy: string; updatedAt: string } | null) => void; timer: NodeJS.Timeout }>();
private stateListResolvers = new Map<string, { resolve: (entries: Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private memoryStoreResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private memoryRecallResolvers = new Map<string, { resolve: (memories: Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) => void; timer: NodeJS.Timeout }>();
private stateChangeHandlers = new Set<(change: { key: string; value: unknown; updatedBy: string }) => void>();
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private grantFileAccessResolvers = new Map<string, { resolve: (ok: boolean) => void; timer: NodeJS.Timeout }>();
private closed = false;
private reconnectAttempt = 0;
private helloTimer: NodeJS.Timeout | null = null;
@@ -110,6 +111,15 @@ export class BrokerClient {
return this.pushBuffer;
}
/** Session public key hex (null before first connection). */
getSessionPubkey(): string | null { return this.sessionPubkey; }
/** Session secret key hex (null before first connection). */
getSessionSecretKey(): string | null { return this.sessionSecretKey; }
private makeReqId(): string {
return Math.random().toString(36).slice(2) + Date.now().toString(36);
}
/** Open WS, send hello, resolve when hello_ack received. */
async connect(): Promise<void> {
if (this.closed) throw new Error("client is closed");
@@ -299,16 +309,11 @@ export class BrokerClient {
async listPeers(): Promise<PeerInfo[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.listPeersResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_peers" }));
// Timeout after 5s — return empty list rather than hang.
setTimeout(() => {
const idx = this.listPeersResolvers.indexOf(resolve);
if (idx !== -1) {
this.listPeersResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.listPeersResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.listPeersResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_peers", _reqId: reqId }));
});
}
@@ -342,15 +347,11 @@ export class BrokerClient {
async getState(key: string): Promise<{ key: string; value: unknown; updatedBy: string; updatedAt: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.stateResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_state", key }));
setTimeout(() => {
const idx = this.stateResolvers.indexOf(resolve);
if (idx !== -1) {
this.stateResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.stateResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.stateResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_state", key, _reqId: reqId }));
});
}
@@ -358,15 +359,11 @@ export class BrokerClient {
async listState(): Promise<Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.stateListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_state" }));
setTimeout(() => {
const idx = this.stateListResolvers.indexOf(resolve);
if (idx !== -1) {
this.stateListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.stateListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.stateListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_state", _reqId: reqId }));
});
}
@@ -376,15 +373,11 @@ export class BrokerClient {
async remember(content: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.memoryStoreResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "remember", content, tags }));
setTimeout(() => {
const idx = this.memoryStoreResolvers.indexOf(resolve);
if (idx !== -1) {
this.memoryStoreResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.memoryStoreResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.memoryStoreResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "remember", content, tags, _reqId: reqId }));
});
}
@@ -392,15 +385,11 @@ export class BrokerClient {
async recall(query: string): Promise<Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.memoryRecallResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "recall", query }));
setTimeout(() => {
const idx = this.memoryRecallResolvers.indexOf(resolve);
if (idx !== -1) {
this.memoryRecallResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.memoryRecallResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.memoryRecallResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "recall", query, _reqId: reqId }));
});
}
@@ -411,51 +400,46 @@ export class BrokerClient {
}
/** Check delivery status of a sent message. */
private messageStatusResolvers: Array<(result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void> = [];
private fileUrlResolvers: Array<(result: { url: string; name: string } | null) => void> = [];
private fileListResolvers: Array<(files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void> = [];
private fileStatusResolvers: Array<(accesses: Array<{ peerName: string; accessedAt: string }>) => void> = [];
private vectorStoredResolvers: Array<(id: string | null) => void> = [];
private vectorResultsResolvers: Array<(results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void> = [];
private collectionListResolvers: Array<(collections: string[]) => void> = [];
private graphResultResolvers: Array<(rows: Array<Record<string, unknown>>) => void> = [];
private contextListResolvers: Array<(contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void> = [];
private contextResultsResolvers: Array<(contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void> = [];
private taskCreatedResolvers: Array<(id: string | null) => void> = [];
private taskListResolvers: Array<(tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void> = [];
private meshQueryResolvers: Array<(result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void> = [];
private meshSchemaResolvers: Array<(tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void> = [];
private streamCreatedResolvers: Array<(id: string | null) => void> = [];
private streamListResolvers: Array<(streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void> = [];
private messageStatusResolvers = new Map<string, { resolve: (result: { messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null) => void; timer: NodeJS.Timeout }>();
private fileUrlResolvers = new Map<string, { resolve: (result: { url: string; name: string; encrypted?: boolean; sealedKey?: string } | null) => void; timer: NodeJS.Timeout }>();
private fileListResolvers = new Map<string, { resolve: (files: Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) => void; timer: NodeJS.Timeout }>();
private fileStatusResolvers = new Map<string, { resolve: (accesses: Array<{ peerName: string; accessedAt: string }>) => void; timer: NodeJS.Timeout }>();
private vectorStoredResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private vectorResultsResolvers = new Map<string, { resolve: (results: Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) => void; timer: NodeJS.Timeout }>();
private collectionListResolvers = new Map<string, { resolve: (collections: string[]) => void; timer: NodeJS.Timeout }>();
private graphResultResolvers = new Map<string, { resolve: (rows: Array<Record<string, unknown>>) => void; timer: NodeJS.Timeout }>();
private contextListResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private contextResultsResolvers = new Map<string, { resolve: (contexts: Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) => void; timer: NodeJS.Timeout }>();
private taskCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private taskListResolvers = new Map<string, { resolve: (tasks: Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) => void; timer: NodeJS.Timeout }>();
private meshQueryResolvers = new Map<string, { resolve: (result: { columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null) => void; timer: NodeJS.Timeout }>();
private meshSchemaResolvers = new Map<string, { resolve: (tables: Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) => void; timer: NodeJS.Timeout }>();
private streamCreatedResolvers = new Map<string, { resolve: (id: string | null) => void; timer: NodeJS.Timeout }>();
private streamListResolvers = new Map<string, { resolve: (streams: Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) => void; timer: NodeJS.Timeout }>();
private streamDataHandlers = new Set<(data: { stream: string; data: unknown; publishedBy: string }) => void>();
async messageStatus(messageId: string): Promise<{ messageId: string; targetSpec: string; delivered: boolean; deliveredAt: string | null; recipients: Array<{ name: string; pubkey: string; status: string }> } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.messageStatusResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "message_status", messageId }));
setTimeout(() => {
const idx = this.messageStatusResolvers.indexOf(resolve);
if (idx !== -1) { this.messageStatusResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.messageStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.messageStatusResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "message_status", messageId, _reqId: reqId }));
});
}
// --- Files ---
/** Get a download URL for a shared file. */
async getFile(fileId: string): Promise<{ url: string; name: string } | null> {
async getFile(fileId: string): Promise<{ url: string; name: string; encrypted?: boolean; sealedKey?: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.fileUrlResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_file", fileId }));
setTimeout(() => {
const idx = this.fileUrlResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileUrlResolvers.splice(idx, 1);
resolve(null);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileUrlResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileUrlResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_file", fileId, _reqId: reqId }));
});
}
@@ -463,15 +447,11 @@ export class BrokerClient {
async listFiles(query?: string, from?: string): Promise<Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.fileListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_files", query, from }));
setTimeout(() => {
const idx = this.fileListResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileListResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_files", query, from, _reqId: reqId }));
});
}
@@ -479,15 +459,11 @@ export class BrokerClient {
async fileStatus(fileId: string): Promise<Array<{ peerName: string; accessedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.fileStatusResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "file_status", fileId }));
setTimeout(() => {
const idx = this.fileStatusResolvers.indexOf(resolve);
if (idx !== -1) {
this.fileStatusResolvers.splice(idx, 1);
resolve([]);
}
}, 5_000);
const reqId = this.makeReqId();
this.fileStatusResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.fileStatusResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "file_status", fileId, _reqId: reqId }));
});
}
@@ -497,10 +473,11 @@ export class BrokerClient {
this.ws.send(JSON.stringify({ type: "delete_file", fileId }));
}
/** Upload a file to the broker via HTTP POST. Returns file ID or null. */
/** Upload a file to the broker via HTTP POST. Returns file ID. */
async uploadFile(filePath: string, meshId: string, memberId: string, opts: {
name?: string; tags?: string[]; persistent?: boolean; targetSpec?: string;
}): Promise<string | null> {
encrypted?: boolean; ownerPubkey?: string; fileKeys?: Array<{ peerPubkey: string; sealedKey: string }>;
}): Promise<string> {
const { readFileSync } = await import("node:fs");
const { basename } = await import("node:path");
const data = readFileSync(filePath);
@@ -522,12 +499,30 @@ export class BrokerClient {
"X-Tags": JSON.stringify(opts.tags ?? []),
"X-Persistent": String(opts.persistent ?? true),
"X-Target-Spec": opts.targetSpec ?? "",
...(opts.encrypted ? { "X-Encrypted": "true" } : {}),
...(opts.ownerPubkey ? { "X-Owner-Pubkey": opts.ownerPubkey } : {}),
...(opts.fileKeys?.length ? { "X-File-Keys": JSON.stringify(opts.fileKeys) } : {}),
},
body: data,
signal: AbortSignal.timeout(30_000),
});
const body = await res.json() as { ok?: boolean; fileId?: string };
return body.fileId ?? null;
const body = await res.json() as { ok?: boolean; fileId?: string; error?: string };
if (!res.ok || !body.fileId) {
throw new Error(body.error ?? `HTTP ${res.status}`);
}
return body.fileId;
}
/** Grant a peer access to an encrypted file (owner only). */
async grantFileAccess(fileId: string, peerPubkey: string, sealedKey: string): Promise<boolean> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false;
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.grantFileAccessResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.grantFileAccessResolvers.delete(reqId)) resolve(false);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "grant_file_access", fileId, peerPubkey, sealedKey, _reqId: reqId }));
});
}
// --- Vectors ---
@@ -536,12 +531,11 @@ export class BrokerClient {
async vectorStore(collection: string, text: string, metadata?: Record<string, unknown>): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.vectorStoredResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata }));
setTimeout(() => {
const idx = this.vectorStoredResolvers.indexOf(resolve);
if (idx !== -1) { this.vectorStoredResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.vectorStoredResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.vectorStoredResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "vector_store", collection, text, metadata, _reqId: reqId }));
});
}
@@ -549,12 +543,11 @@ export class BrokerClient {
async vectorSearch(collection: string, query: string, limit?: number): Promise<Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.vectorResultsResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit }));
setTimeout(() => {
const idx = this.vectorResultsResolvers.indexOf(resolve);
if (idx !== -1) { this.vectorResultsResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.vectorResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.vectorResultsResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "vector_search", collection, query, limit, _reqId: reqId }));
});
}
@@ -568,12 +561,11 @@ export class BrokerClient {
async listCollections(): Promise<string[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.collectionListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_collections" }));
setTimeout(() => {
const idx = this.collectionListResolvers.indexOf(resolve);
if (idx !== -1) { this.collectionListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.collectionListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.collectionListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_collections", _reqId: reqId }));
});
}
@@ -583,12 +575,11 @@ export class BrokerClient {
async graphQuery(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.graphResultResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "graph_query", cypher }));
setTimeout(() => {
const idx = this.graphResultResolvers.indexOf(resolve);
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.graphResultResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "graph_query", cypher, _reqId: reqId }));
});
}
@@ -596,12 +587,11 @@ export class BrokerClient {
async graphExecute(cypher: string): Promise<Array<Record<string, unknown>>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.graphResultResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "graph_execute", cypher }));
setTimeout(() => {
const idx = this.graphResultResolvers.indexOf(resolve);
if (idx !== -1) { this.graphResultResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.graphResultResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.graphResultResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "graph_execute", cypher, _reqId: reqId }));
});
}
@@ -617,12 +607,11 @@ export class BrokerClient {
async getContext(query: string): Promise<Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.contextResultsResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "get_context", query }));
setTimeout(() => {
const idx = this.contextResultsResolvers.indexOf(resolve);
if (idx !== -1) { this.contextResultsResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.contextResultsResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.contextResultsResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "get_context", query, _reqId: reqId }));
});
}
@@ -630,12 +619,11 @@ export class BrokerClient {
async listContexts(): Promise<Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.contextListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_contexts" }));
setTimeout(() => {
const idx = this.contextListResolvers.indexOf(resolve);
if (idx !== -1) { this.contextListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.contextListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.contextListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_contexts", _reqId: reqId }));
});
}
@@ -645,37 +633,35 @@ export class BrokerClient {
async createTask(title: string, assignee?: string, priority?: string, tags?: string[]): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.taskCreatedResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags }));
setTimeout(() => {
const idx = this.taskCreatedResolvers.indexOf(resolve);
if (idx !== -1) { this.taskCreatedResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.taskCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.taskCreatedResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "create_task", title, assignee, priority, tags, _reqId: reqId }));
});
}
/** Claim an unclaimed task. */
async claimTask(id: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "claim_task", id }));
this.ws.send(JSON.stringify({ type: "claim_task", taskId: id }));
}
/** Mark a task done with optional result. */
async completeTask(id: string, result?: string): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "complete_task", id, result }));
this.ws.send(JSON.stringify({ type: "complete_task", taskId: id, result }));
}
/** List tasks filtered by status/assignee. */
async listTasks(status?: string, assignee?: string): Promise<Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.taskListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee }));
setTimeout(() => {
const idx = this.taskListResolvers.indexOf(resolve);
if (idx !== -1) { this.taskListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.taskListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.taskListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_tasks", status, assignee, _reqId: reqId }));
});
}
@@ -685,12 +671,11 @@ export class BrokerClient {
async meshQuery(sql: string): Promise<{ columns: string[]; rows: Array<Record<string, unknown>>; rowCount: number } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.meshQueryResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_query", sql }));
setTimeout(() => {
const idx = this.meshQueryResolvers.indexOf(resolve);
if (idx !== -1) { this.meshQueryResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.meshQueryResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshQueryResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_query", sql, _reqId: reqId }));
});
}
@@ -704,12 +689,11 @@ export class BrokerClient {
async meshSchema(): Promise<Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.meshSchemaResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_schema" }));
setTimeout(() => {
const idx = this.meshSchemaResolvers.indexOf(resolve);
if (idx !== -1) { this.meshSchemaResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.meshSchemaResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshSchemaResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_schema", _reqId: reqId }));
});
}
@@ -719,12 +703,11 @@ export class BrokerClient {
async createStream(name: string): Promise<string | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.streamCreatedResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "create_stream", name }));
setTimeout(() => {
const idx = this.streamCreatedResolvers.indexOf(resolve);
if (idx !== -1) { this.streamCreatedResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.streamCreatedResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.streamCreatedResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "create_stream", name, _reqId: reqId }));
});
}
@@ -750,12 +733,11 @@ export class BrokerClient {
async listStreams(): Promise<Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
this.streamListResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "list_streams" }));
setTimeout(() => {
const idx = this.streamListResolvers.indexOf(resolve);
if (idx !== -1) { this.streamListResolvers.splice(idx, 1); resolve([]); }
}, 5_000);
const reqId = this.makeReqId();
this.streamListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.streamListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_streams", _reqId: reqId }));
});
}
@@ -772,17 +754,16 @@ export class BrokerClient {
}
// --- Mesh info ---
private meshInfoResolvers: Array<(result: Record<string, unknown> | null) => void> = [];
private meshInfoResolvers = new Map<string, { resolve: (result: Record<string, unknown> | null) => void; timer: NodeJS.Timeout }>();
async meshInfo(): Promise<Record<string, unknown> | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
this.meshInfoResolvers.push(resolve);
this.ws!.send(JSON.stringify({ type: "mesh_info" }));
setTimeout(() => {
const idx = this.meshInfoResolvers.indexOf(resolve);
if (idx !== -1) { this.meshInfoResolvers.splice(idx, 1); resolve(null); }
}, 5_000);
const reqId = this.makeReqId();
this.meshInfoResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.meshInfoResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "mesh_info", _reqId: reqId }));
});
}
@@ -802,7 +783,33 @@ export class BrokerClient {
// --- Internals ---
private resolveFromMap<T>(
map: Map<string, { resolve: (v: T) => void; timer: NodeJS.Timeout }>,
reqId: string | undefined,
value: T,
): boolean {
let entry = reqId ? map.get(reqId) : undefined;
if (!entry) {
// Fallback: oldest pending (FIFO, for brokers that don't echo _reqId)
const first = map.entries().next().value as [string, { resolve: (v: T) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
entry = first[1];
map.delete(first[0]);
}
} else {
map.delete(reqId!);
}
if (entry) {
clearTimeout(entry.timer);
entry.resolve(value);
return true;
}
return false;
}
private handleServerMessage(msg: Record<string, unknown>): void {
const msgReqId = msg._reqId as string | undefined;
if (msg.type === "ack") {
const pending = this.pendingSends.get(String(msg.id ?? ""));
if (pending) {
@@ -816,8 +823,7 @@ export class BrokerClient {
}
if (msg.type === "peers_list") {
const peers = (msg.peers as PeerInfo[]) ?? [];
const resolver = this.listPeersResolvers.shift();
if (resolver) resolver(peers);
this.resolveFromMap(this.listPeersResolvers, msgReqId, peers);
return;
}
if (msg.type === "push") {
@@ -890,25 +896,26 @@ export class BrokerClient {
return;
}
if (msg.type === "state_result") {
const resolver = this.stateResolvers.shift();
if (resolver) {
if (msg.key) {
resolver({
key: String(msg.key),
value: msg.value,
updatedBy: String(msg.updatedBy ?? ""),
updatedAt: String(msg.updatedAt ?? ""),
});
} else {
resolver(null);
}
// DEPENDENCY: The broker must NOT send state_result for set_state
// operations (only for get_state). If the broker sends state_result for
// both, it would be consumed here by the next pending get_state resolver,
// returning the wrong value (cross-contamination). The broker's set_state
// handler was fixed to omit state_result; only get_state sends it.
if (msg.key) {
this.resolveFromMap(this.stateResolvers, msgReqId, {
key: String(msg.key),
value: msg.value,
updatedBy: String(msg.updatedBy ?? ""),
updatedAt: String(msg.updatedAt ?? ""),
});
} else {
this.resolveFromMap(this.stateResolvers, msgReqId, null);
}
return;
}
if (msg.type === "state_list") {
const entries = (msg.entries as Array<{ key: string; value: unknown; updatedBy: string; updatedAt: string }>) ?? [];
const resolver = this.stateListResolvers.shift();
if (resolver) resolver(entries);
this.resolveFromMap(this.stateListResolvers, msgReqId, entries);
return;
}
if (msg.type === "state_change") {
@@ -923,120 +930,108 @@ export class BrokerClient {
return;
}
if (msg.type === "memory_stored") {
const resolver = this.memoryStoreResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.memoryStoreResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "memory_results") {
const memories = (msg.memories as Array<{ id: string; content: string; tags: string[]; rememberedBy: string; rememberedAt: string }>) ?? [];
const resolver = this.memoryRecallResolvers.shift();
if (resolver) resolver(memories);
this.resolveFromMap(this.memoryRecallResolvers, msgReqId, memories);
return;
}
if (msg.type === "message_status_result") {
const resolver = this.messageStatusResolvers.shift();
if (resolver) resolver(msg as any);
this.resolveFromMap(this.messageStatusResolvers, msgReqId, msg as any);
return;
}
if (msg.type === "file_url") {
const resolver = this.fileUrlResolvers.shift();
if (resolver) {
if (msg.url) {
resolver({ url: String(msg.url), name: String(msg.name ?? "") });
} else {
resolver(null);
}
if (msg.url) {
this.resolveFromMap(this.fileUrlResolvers, msgReqId, {
url: String(msg.url),
name: String(msg.name ?? ""),
encrypted: msg.encrypted ? true : undefined,
sealedKey: msg.sealedKey ? String(msg.sealedKey) : undefined,
});
} else {
this.resolveFromMap(this.fileUrlResolvers, msgReqId, null);
}
return;
}
if (msg.type === "file_list") {
const files = (msg.files as Array<{ id: string; name: string; size: number; tags: string[]; uploadedBy: string; uploadedAt: string; persistent: boolean }>) ?? [];
const resolver = this.fileListResolvers.shift();
if (resolver) resolver(files);
this.resolveFromMap(this.fileListResolvers, msgReqId, files);
return;
}
if (msg.type === "file_status_result") {
const accesses = (msg.accesses as Array<{ peerName: string; accessedAt: string }>) ?? [];
const resolver = this.fileStatusResolvers.shift();
if (resolver) resolver(accesses);
this.resolveFromMap(this.fileStatusResolvers, msgReqId, accesses);
return;
}
if (msg.type === "grant_file_access_ok") {
this.resolveFromMap(this.grantFileAccessResolvers, msgReqId, true);
return;
}
if (msg.type === "vector_stored") {
const resolver = this.vectorStoredResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.vectorStoredResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "vector_results") {
const results = (msg.results as Array<{ id: string; text: string; score: number; metadata?: Record<string, unknown> }>) ?? [];
const resolver = this.vectorResultsResolvers.shift();
if (resolver) resolver(results);
this.resolveFromMap(this.vectorResultsResolvers, msgReqId, results);
return;
}
if (msg.type === "collection_list") {
const collections = (msg.collections as string[]) ?? [];
const resolver = this.collectionListResolvers.shift();
if (resolver) resolver(collections);
this.resolveFromMap(this.collectionListResolvers, msgReqId, collections);
return;
}
if (msg.type === "graph_result") {
const rows = (msg.rows as Array<Record<string, unknown>>) ?? [];
const resolver = this.graphResultResolvers.shift();
if (resolver) resolver(rows);
// Broker sends { type: "graph_result", records: [...] }
const rows = (msg.records as Array<Record<string, unknown>>) ?? [];
this.resolveFromMap(this.graphResultResolvers, msgReqId, rows);
return;
}
if (msg.type === "context_list") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextListResolvers.shift();
if (resolver) resolver(contexts);
this.resolveFromMap(this.contextListResolvers, msgReqId, contexts);
return;
}
if (msg.type === "context_results") {
const contexts = (msg.contexts as Array<{ peerName: string; summary: string; filesRead: string[]; keyFindings: string[]; tags: string[]; updatedAt: string }>) ?? [];
const resolver = this.contextResultsResolvers.shift();
if (resolver) resolver(contexts);
this.resolveFromMap(this.contextResultsResolvers, msgReqId, contexts);
return;
}
if (msg.type === "task_created") {
const resolver = this.taskCreatedResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.taskCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "task_list") {
const tasks = (msg.tasks as Array<{ id: string; title: string; assignee: string; status: string; priority: string; createdBy: string }>) ?? [];
const resolver = this.taskListResolvers.shift();
if (resolver) resolver(tasks);
this.resolveFromMap(this.taskListResolvers, msgReqId, tasks);
return;
}
if (msg.type === "mesh_query_result") {
const resolver = this.meshQueryResolvers.shift();
if (resolver) {
if (msg.columns) {
resolver({
columns: (msg.columns as string[]) ?? [],
rows: (msg.rows as Array<Record<string, unknown>>) ?? [],
rowCount: (msg.rowCount as number) ?? 0,
});
} else {
resolver(null);
}
if (msg.columns) {
this.resolveFromMap(this.meshQueryResolvers, msgReqId, {
columns: (msg.columns as string[]) ?? [],
rows: (msg.rows as Array<Record<string, unknown>>) ?? [],
rowCount: (msg.rowCount as number) ?? 0,
});
} else {
this.resolveFromMap(this.meshQueryResolvers, msgReqId, null);
}
return;
}
if (msg.type === "mesh_schema_result") {
const tables = (msg.tables as Array<{ name: string; columns: Array<{ name: string; type: string; nullable: boolean }> }>) ?? [];
const resolver = this.meshSchemaResolvers.shift();
if (resolver) resolver(tables);
this.resolveFromMap(this.meshSchemaResolvers, msgReqId, tables);
return;
}
if (msg.type === "stream_created") {
const resolver = this.streamCreatedResolvers.shift();
if (resolver) resolver(msg.id ? String(msg.id) : null);
this.resolveFromMap(this.streamCreatedResolvers, msgReqId, msg.id ? String(msg.id) : null);
return;
}
if (msg.type === "stream_list") {
const streams = (msg.streams as Array<{ id: string; name: string; createdBy: string; subscriberCount: number }>) ?? [];
const resolver = this.streamListResolvers.shift();
if (resolver) resolver(streams);
this.resolveFromMap(this.streamListResolvers, msgReqId, streams);
return;
}
if (msg.type === "stream_data") {
@@ -1051,13 +1046,13 @@ export class BrokerClient {
return;
}
if (msg.type === "mesh_info_result") {
const resolver = this.meshInfoResolvers.shift();
if (resolver) resolver(msg as Record<string, unknown>);
this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record<string, unknown>);
return;
}
if (msg.type === "error") {
this.debug(`broker error: ${msg.code} ${msg.message}`);
const id = msg.id ? String(msg.id) : null;
let handledByPendingSend = false;
if (id) {
const pending = this.pendingSends.get(id);
if (pending) {
@@ -1066,6 +1061,46 @@ export class BrokerClient {
error: `${msg.code}: ${msg.message}`,
});
this.pendingSends.delete(id);
handledByPendingSend = true;
}
}
if (!handledByPendingSend) {
// Best-effort: unblock the first waiting resolver so callers don't
// hang for 5s. We don't know which tool triggered the error, so we
// pop the first non-empty resolver map in priority order.
const allMaps: Array<[Map<string, { resolve: (v: any) => void; timer: NodeJS.Timeout }>, unknown]> = [
[this.stateResolvers, null],
[this.stateListResolvers, []],
[this.memoryStoreResolvers, null],
[this.memoryRecallResolvers, []],
[this.fileUrlResolvers, null],
[this.fileListResolvers, []],
[this.fileStatusResolvers, []],
[this.graphResultResolvers, []],
[this.vectorStoredResolvers, null],
[this.vectorResultsResolvers, []],
[this.taskListResolvers, []],
[this.meshQueryResolvers, null],
[this.contextResultsResolvers, []],
[this.contextListResolvers, []],
[this.streamListResolvers, []],
[this.messageStatusResolvers, null],
[this.grantFileAccessResolvers, false],
[this.collectionListResolvers, []],
[this.meshSchemaResolvers, []],
[this.taskCreatedResolvers, null],
[this.streamCreatedResolvers, null],
[this.listPeersResolvers, []],
[this.meshInfoResolvers, null],
];
for (const [map, defaultVal] of allMaps) {
const first = (map as Map<string, any>).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;
if (first) {
(map as Map<string, any>).delete(first[0]);
clearTimeout(first[1].timer);
first[1].resolve(defaultVal);
break; // only pop one
}
}
}
}

View File

@@ -12,6 +12,7 @@ import { env } from "../env";
const clients = new Map<string, BrokerClient>();
let configDisplayName: string | undefined;
let configGroups: Config["groups"] = [];
/** Ensure a BrokerClient exists + is connecting/open for this mesh. */
export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
@@ -21,6 +22,10 @@ export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
clients.set(mesh.meshId, client);
try {
await client.connect();
// Auto-join groups declared at launch time (--groups flag or config).
for (const g of configGroups ?? []) {
try { await client.joinGroup(g.name, g.role); } catch { /* best effort */ }
}
} catch {
// Connect failed → client is in "reconnecting" state, leave it
// wired so tool calls can surface the status.
@@ -31,6 +36,7 @@ export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
/** Start clients for every joined mesh. Called once on MCP server start. */
export async function startClients(config: Config): Promise<void> {
configDisplayName = config.displayName;
configGroups = config.groups ?? [];
await Promise.allSettled(config.meshes.map(ensureClient));
}

View File

@@ -4,7 +4,7 @@
"private": true,
"type": "module",
"scripts": {
"build": "next build",
"build": "next build --no-turbopack",
"clean": "git clean -xdf .cache .next .turbo node_modules",
"dev": "next dev",
"format": "prettier --check . --ignore-path ../../.gitignore",

9036
bun.lock Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
ALTER TABLE "mesh"."file" ADD COLUMN "encrypted" boolean DEFAULT false NOT NULL;--> statement-breakpoint
ALTER TABLE "mesh"."file" ADD COLUMN "owner_pubkey" text;--> statement-breakpoint
CREATE TABLE "mesh"."file_key" (
"id" text PRIMARY KEY NOT NULL,
"file_id" text NOT NULL,
"peer_pubkey" text NOT NULL,
"sealed_key" text NOT NULL,
"granted_at" timestamp DEFAULT now() NOT NULL,
"granted_by_pubkey" text
);
--> statement-breakpoint
ALTER TABLE "mesh"."file_key" ADD CONSTRAINT "file_key_file_id_fkey" FOREIGN KEY ("file_id") REFERENCES "mesh"."file"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE UNIQUE INDEX "file_key_file_peer_idx" ON "mesh"."file_key" ("file_id","peer_pubkey");

View File

@@ -0,0 +1,3 @@
ALTER TABLE "mesh"."context" ADD COLUMN "member_id" text;--> statement-breakpoint
ALTER TABLE "mesh"."context" ADD CONSTRAINT "context_member_id_member_id_fk" FOREIGN KEY ("member_id") REFERENCES "mesh"."member"("id") ON DELETE cascade ON UPDATE cascade;--> statement-breakpoint
CREATE UNIQUE INDEX "context_mesh_member_idx" ON "mesh"."context" ("mesh_id","member_id");

View File

@@ -305,6 +305,8 @@ export const meshFile = meshSchema.table("file", {
minioKey: text().notNull(),
tags: text().array().default([]),
persistent: boolean().notNull().default(true),
encrypted: boolean().notNull().default(false),
ownerPubkey: text(),
uploadedByName: text(),
uploadedByMember: text().references(() => meshMember.id),
targetSpec: text(), // null = entire mesh
@@ -328,24 +330,60 @@ export const meshFileAccess = meshSchema.table("file_access", {
});
/**
* Per-peer context snapshot. Each peer (presence) has at most one context
* Per-peer encrypted symmetric keys for E2E encrypted files.
* The file body is encrypted with a random key (Kf); Kf is sealed
* (crypto_box_seal) to each authorized peer's X25519 pubkey and stored here.
*/
export const meshFileKey = meshSchema.table("file_key", {
id: text().primaryKey().notNull().$defaultFn(generateId),
fileId: text()
.references(() => meshFile.id, { onDelete: "cascade" })
.notNull(),
peerPubkey: text().notNull(),
sealedKey: text().notNull(),
grantedAt: timestamp().defaultNow().notNull(),
grantedByPubkey: text(),
});
export const meshFileKeyRelations = relations(meshFileKey, ({ one }) => ({
file: one(meshFile, {
fields: [meshFileKey.fileId],
references: [meshFile.id],
}),
}));
/**
* Per-peer context snapshot. Each peer (member) has at most one context
* entry per mesh, upserted on each share_context call. Allows peers to
* discover what others are working on, which files they've read, and
* key findings — without sending a direct message.
*
* `memberId` is the stable upsert key (survives reconnects). `presenceId`
* is kept for backwards-compat but is nullable — new rows should always
* populate `memberId`. The unique index on (meshId, memberId) prevents
* stale rows from accumulating when a session reconnects with a new
* ephemeral presenceId.
*/
export const meshContext = meshSchema.table("context", {
id: text().primaryKey().notNull().$defaultFn(generateId),
meshId: text()
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
.notNull(),
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
peerName: text(),
summary: text().notNull(),
filesRead: text().array().default([]),
keyFindings: text().array().default([]),
tags: text().array().default([]),
updatedAt: timestamp().defaultNow().notNull(),
});
export const meshContext = meshSchema.table(
"context",
{
id: text().primaryKey().notNull().$defaultFn(generateId),
meshId: text()
.references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" })
.notNull(),
memberId: text().references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" }),
presenceId: text().references(() => presence.id, { onDelete: "cascade" }),
peerName: text(),
summary: text().notNull(),
filesRead: text().array().default([]),
keyFindings: text().array().default([]),
tags: text().array().default([]),
updatedAt: timestamp().defaultNow().notNull(),
},
(table) => [
uniqueIndex("context_mesh_member_idx").on(table.meshId, table.memberId),
],
);
/**
* Mesh-scoped task board. Peers can create tasks, claim them, and mark
@@ -531,6 +569,10 @@ export type SelectMeshFile = typeof meshFile.$inferSelect;
export type InsertMeshFile = typeof meshFile.$inferInsert;
export type SelectMeshFileAccess = typeof meshFileAccess.$inferSelect;
export type InsertMeshFileAccess = typeof meshFileAccess.$inferInsert;
export const selectMeshFileKeySchema = createSelectSchema(meshFileKey);
export const insertMeshFileKeySchema = createInsertSchema(meshFileKey);
export type SelectMeshFileKey = typeof meshFileKey.$inferSelect;
export type InsertMeshFileKey = typeof meshFileKey.$inferInsert;
export const selectMeshContextSchema = createSelectSchema(meshContext);
export const insertMeshContextSchema = createInsertSchema(meshContext);
export const selectMeshTaskSchema = createSelectSchema(meshTask);