Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2557235c68 | ||
|
|
a987e9e27b | ||
|
|
ff86db615f | ||
|
|
4aa61b40e2 | ||
|
|
4afe365c00 | ||
|
|
92bb276a3e | ||
|
|
af8f8ed1f9 |
@@ -265,6 +265,23 @@ export async function refreshQueueDepth(): Promise<void> {
|
||||
metrics.queueDepth.set(Number(row?.n ?? 0));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweep stale presences: mark as disconnected if last_ping_at is older
|
||||
* than 90s (3 missed pings at the 30s interval = dead session).
|
||||
*/
|
||||
export async function sweepStalePresences(): Promise<void> {
|
||||
const cutoff = new Date(Date.now() - 90_000); // 3 missed pings
|
||||
await db
|
||||
.update(presence)
|
||||
.set({ disconnectedAt: new Date() })
|
||||
.where(
|
||||
and(
|
||||
isNull(presence.disconnectedAt),
|
||||
lt(presence.lastPingAt, cutoff),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
/** Sweep expired pending_status entries. */
|
||||
export async function sweepPendingStatuses(): Promise<void> {
|
||||
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
|
||||
@@ -307,6 +324,7 @@ export async function refreshStatusFromJsonl(
|
||||
export interface ConnectParams {
|
||||
memberId: string;
|
||||
sessionId: string;
|
||||
sessionPubkey?: string;
|
||||
displayName?: string;
|
||||
pid: number;
|
||||
cwd: string;
|
||||
@@ -322,6 +340,7 @@ export async function connectPresence(
|
||||
.values({
|
||||
memberId: params.memberId,
|
||||
sessionId: params.sessionId,
|
||||
sessionPubkey: params.sessionPubkey ?? null,
|
||||
displayName: params.displayName ?? null,
|
||||
pid: params.pid,
|
||||
cwd: params.cwd,
|
||||
@@ -371,7 +390,8 @@ export async function listPeersInMesh(
|
||||
> {
|
||||
const rows = await db
|
||||
.select({
|
||||
pubkey: memberTable.peerPubkey,
|
||||
memberPubkey: memberTable.peerPubkey,
|
||||
sessionPubkey: presence.sessionPubkey,
|
||||
memberDisplayName: memberTable.displayName,
|
||||
presenceDisplayName: presence.displayName,
|
||||
status: presence.status,
|
||||
@@ -388,9 +408,9 @@ export async function listPeersInMesh(
|
||||
),
|
||||
)
|
||||
.orderBy(asc(presence.connectedAt));
|
||||
// Prefer per-session displayName over member-level displayName.
|
||||
// Prefer session pubkey for routing, session displayName for display.
|
||||
return rows.map((r) => ({
|
||||
pubkey: r.pubkey,
|
||||
pubkey: r.sessionPubkey || r.memberPubkey,
|
||||
displayName: r.presenceDisplayName || r.memberDisplayName,
|
||||
status: r.status,
|
||||
summary: r.summary,
|
||||
@@ -415,6 +435,7 @@ export async function setSummary(
|
||||
export interface QueueParams {
|
||||
meshId: string;
|
||||
senderMemberId: string;
|
||||
senderSessionPubkey?: string;
|
||||
targetSpec: string;
|
||||
priority: Priority;
|
||||
nonce: string;
|
||||
@@ -429,6 +450,7 @@ export async function queueMessage(params: QueueParams): Promise<string> {
|
||||
.values({
|
||||
meshId: params.meshId,
|
||||
senderMemberId: params.senderMemberId,
|
||||
senderSessionPubkey: params.senderSessionPubkey ?? null,
|
||||
targetSpec: params.targetSpec,
|
||||
priority: params.priority,
|
||||
nonce: params.nonce,
|
||||
@@ -469,6 +491,8 @@ export async function drainForMember(
|
||||
_memberId: string,
|
||||
memberPubkey: string,
|
||||
status: PeerStatus,
|
||||
sessionPubkey?: string,
|
||||
excludeSenderMemberId?: string,
|
||||
): Promise<
|
||||
Array<{
|
||||
id: string;
|
||||
@@ -509,14 +533,15 @@ export async function drainForMember(
|
||||
WHERE mesh_id = ${meshId}
|
||||
AND delivered_at IS NULL
|
||||
AND priority::text IN (${priorityList})
|
||||
AND (target_spec = ${memberPubkey} OR target_spec = '*')
|
||||
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``})
|
||||
${excludeSenderMemberId ? sql`AND sender_member_id != ${excludeSenderMemberId}` : sql``}
|
||||
ORDER BY created_at ASC, id ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
AND m.id = mq.sender_member_id
|
||||
RETURNING mq.id, mq.priority, mq.nonce, mq.ciphertext,
|
||||
mq.created_at, mq.sender_member_id,
|
||||
m.peer_pubkey AS sender_pubkey
|
||||
COALESCE(mq.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey
|
||||
)
|
||||
SELECT * FROM claimed ORDER BY created_at ASC, id ASC
|
||||
`);
|
||||
@@ -547,6 +572,7 @@ export async function drainForMember(
|
||||
|
||||
let ttlTimer: ReturnType<typeof setInterval> | null = null;
|
||||
let pendingTimer: ReturnType<typeof setInterval> | null = null;
|
||||
let staleTimer: ReturnType<typeof setInterval> | null = null;
|
||||
|
||||
/** Start background sweepers. Idempotent. */
|
||||
export function startSweepers(): void {
|
||||
@@ -559,14 +585,21 @@ export function startSweepers(): void {
|
||||
console.error("[broker] pending sweep:", e),
|
||||
);
|
||||
}, PENDING_SWEEP_INTERVAL_MS);
|
||||
staleTimer = setInterval(() => {
|
||||
sweepStalePresences().catch((e) =>
|
||||
console.error("[broker] stale presence sweep:", e),
|
||||
);
|
||||
}, 30_000);
|
||||
}
|
||||
|
||||
/** Stop background sweepers and mark all active presences disconnected. */
|
||||
export async function stopSweepers(): Promise<void> {
|
||||
if (ttlTimer) clearInterval(ttlTimer);
|
||||
if (pendingTimer) clearInterval(pendingTimer);
|
||||
if (staleTimer) clearInterval(staleTimer);
|
||||
ttlTimer = null;
|
||||
pendingTimer = null;
|
||||
staleTimer = null;
|
||||
await db
|
||||
.update(presence)
|
||||
.set({ disconnectedAt: new Date() })
|
||||
|
||||
@@ -56,6 +56,7 @@ interface PeerConn {
|
||||
meshId: string;
|
||||
memberId: string;
|
||||
memberPubkey: string;
|
||||
sessionPubkey: string | null;
|
||||
cwd: string;
|
||||
}
|
||||
|
||||
@@ -80,7 +81,10 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
||||
}
|
||||
}
|
||||
|
||||
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
||||
async function maybePushQueuedMessages(
|
||||
presenceId: string,
|
||||
excludeSenderMemberId?: string,
|
||||
): Promise<void> {
|
||||
const conn = connections.get(presenceId);
|
||||
if (!conn) return;
|
||||
const status = await refreshStatusFromJsonl(
|
||||
@@ -93,6 +97,8 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
||||
conn.memberId,
|
||||
conn.memberPubkey,
|
||||
status,
|
||||
conn.sessionPubkey ?? undefined,
|
||||
excludeSenderMemberId,
|
||||
);
|
||||
for (const m of messages) {
|
||||
const push: WSPushMessage = {
|
||||
@@ -400,6 +406,7 @@ async function handleHello(
|
||||
const presenceId = await connectPresence({
|
||||
memberId: member.id,
|
||||
sessionId: hello.sessionId,
|
||||
sessionPubkey: hello.sessionPubkey,
|
||||
displayName: hello.displayName,
|
||||
pid: hello.pid,
|
||||
cwd: hello.cwd,
|
||||
@@ -409,6 +416,7 @@ async function handleHello(
|
||||
meshId: hello.meshId,
|
||||
memberId: member.id,
|
||||
memberPubkey: hello.pubkey,
|
||||
sessionPubkey: hello.sessionPubkey ?? null,
|
||||
cwd: hello.cwd,
|
||||
});
|
||||
incMeshCount(hello.meshId);
|
||||
@@ -434,6 +442,7 @@ async function handleSend(
|
||||
const messageId = await queueMessage({
|
||||
meshId: conn.meshId,
|
||||
senderMemberId: conn.memberId,
|
||||
senderSessionPubkey: conn.sessionPubkey ?? undefined,
|
||||
targetSpec: msg.targetSpec,
|
||||
priority: msg.priority,
|
||||
nonce: msg.nonce,
|
||||
@@ -447,12 +456,21 @@ async function handleSend(
|
||||
};
|
||||
conn.ws.send(JSON.stringify(ack));
|
||||
|
||||
// Fan-out over connected peers in the same mesh.
|
||||
// Find sender's presenceId to exclude from fan-out.
|
||||
let senderPresenceId: string | undefined;
|
||||
for (const [pid, peer] of connections) {
|
||||
if (peer.ws === conn.ws) { senderPresenceId = pid; break; }
|
||||
}
|
||||
|
||||
// Fan-out over connected peers in the same mesh — skip sender.
|
||||
for (const [pid, peer] of connections) {
|
||||
if (pid === senderPresenceId) continue;
|
||||
if (peer.meshId !== conn.meshId) continue;
|
||||
if (msg.targetSpec !== "*" && peer.memberPubkey !== msg.targetSpec)
|
||||
if (msg.targetSpec !== "*"
|
||||
&& peer.memberPubkey !== msg.targetSpec
|
||||
&& peer.sessionPubkey !== msg.targetSpec)
|
||||
continue;
|
||||
void maybePushQueuedMessages(pid);
|
||||
void maybePushQueuedMessages(pid, conn.memberId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,6 +52,7 @@ export interface WSHelloMessage {
|
||||
meshId: string;
|
||||
memberId: string;
|
||||
pubkey: string; // must match mesh.member.peerPubkey
|
||||
sessionPubkey?: string; // ephemeral per-launch pubkey for message routing
|
||||
displayName?: string; // optional override for this session
|
||||
sessionId: string;
|
||||
pid: number;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "claudemesh-cli",
|
||||
"version": "0.1.9",
|
||||
"version": "0.1.15",
|
||||
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
|
||||
"keywords": [
|
||||
"claude-code",
|
||||
|
||||
@@ -14,7 +14,10 @@ import { parseInviteLink } from "../invite/parse";
|
||||
import { enrollWithBroker } from "../invite/enroll";
|
||||
import { generateKeypair } from "../crypto/keypair";
|
||||
import { loadConfig, saveConfig, getConfigPath } from "../state/config";
|
||||
import { hostname } from "node:os";
|
||||
import { writeFileSync, mkdirSync } from "node:fs";
|
||||
import { join, dirname } from "node:path";
|
||||
import { homedir, hostname } from "node:os";
|
||||
import { env } from "../env";
|
||||
|
||||
export async function runJoin(args: string[]): Promise<void> {
|
||||
const link = args[0];
|
||||
@@ -78,6 +81,16 @@ export async function runJoin(args: string[]): Promise<void> {
|
||||
});
|
||||
saveConfig(config);
|
||||
|
||||
// 4b. Store invite token for per-session re-enrollment (launch --name).
|
||||
const configDir = env.CLAUDEMESH_CONFIG_DIR ?? join(homedir(), ".claudemesh");
|
||||
const inviteFile = join(configDir, `invite-${payload.mesh_slug}.txt`);
|
||||
try {
|
||||
mkdirSync(dirname(inviteFile), { recursive: true });
|
||||
writeFileSync(inviteFile, link, "utf-8");
|
||||
} catch {
|
||||
// Non-fatal — launch will fall back to shared identity.
|
||||
}
|
||||
|
||||
// 5. Report.
|
||||
console.log("");
|
||||
console.log(
|
||||
|
||||
@@ -11,15 +11,12 @@
|
||||
*/
|
||||
|
||||
import { spawn } from "node:child_process";
|
||||
import { mkdtempSync, writeFileSync, rmSync } from "node:fs";
|
||||
import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync } from "node:fs";
|
||||
import { tmpdir, hostname } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { createInterface } from "node:readline";
|
||||
import { loadConfig, getConfigPath } from "../state/config";
|
||||
import type { Config, JoinedMesh } from "../state/config";
|
||||
import { generateKeypair } from "../crypto/keypair";
|
||||
import { enrollWithBroker } from "../invite/enroll";
|
||||
import { parseInviteLink } from "../invite/parse";
|
||||
|
||||
// --- Arg parsing ---
|
||||
|
||||
@@ -28,6 +25,7 @@ interface LaunchArgs {
|
||||
joinLink: string | null;
|
||||
meshSlug: string | null;
|
||||
quiet: boolean;
|
||||
skipPermConfirm: boolean;
|
||||
claudeArgs: string[];
|
||||
}
|
||||
|
||||
@@ -37,6 +35,7 @@ function parseArgs(argv: string[]): LaunchArgs {
|
||||
joinLink: null,
|
||||
meshSlug: null,
|
||||
quiet: false,
|
||||
skipPermConfirm: false,
|
||||
claudeArgs: [],
|
||||
};
|
||||
|
||||
@@ -57,6 +56,8 @@ function parseArgs(argv: string[]): LaunchArgs {
|
||||
result.meshSlug = arg.slice("--mesh=".length);
|
||||
} else if (arg === "--quiet") {
|
||||
result.quiet = true;
|
||||
} else if (arg === "-y" || arg === "--yes") {
|
||||
result.skipPermConfirm = true;
|
||||
} else if (arg === "--") {
|
||||
result.claudeArgs.push(...argv.slice(i + 1));
|
||||
break;
|
||||
@@ -94,6 +95,41 @@ async function pickMesh(meshes: JoinedMesh[]): Promise<JoinedMesh> {
|
||||
});
|
||||
}
|
||||
|
||||
// --- Permission confirmation ---
|
||||
|
||||
async function confirmPermissions(): Promise<void> {
|
||||
const useColor =
|
||||
!process.env.NO_COLOR && process.env.TERM !== "dumb" && process.stdout.isTTY;
|
||||
const bold = (s: string): string => (useColor ? `\x1b[1m${s}\x1b[22m` : s);
|
||||
const dim = (s: string): string => (useColor ? `\x1b[2m${s}\x1b[22m` : s);
|
||||
const yellow = (s: string): string => (useColor ? `\x1b[33m${s}\x1b[39m` : s);
|
||||
|
||||
console.log(yellow(bold(" Autonomous mode")));
|
||||
console.log("");
|
||||
console.log(" Claude will send and receive peer messages without asking");
|
||||
console.log(" you first. Peers exchange text only — no file access,");
|
||||
console.log(" no tool calls, no code execution.");
|
||||
console.log("");
|
||||
console.log(dim(" Same as: claude --dangerously-skip-permissions"));
|
||||
console.log(dim(" Skip this prompt: claudemesh launch -y"));
|
||||
console.log("");
|
||||
|
||||
const rl = createInterface({ input: process.stdin, output: process.stdout });
|
||||
return new Promise((resolve, reject) => {
|
||||
rl.question(` ${bold("Continue?")} [Y/n] `, (answer) => {
|
||||
rl.close();
|
||||
const a = answer.trim().toLowerCase();
|
||||
if (a === "" || a === "y" || a === "yes") {
|
||||
resolve();
|
||||
} else {
|
||||
console.log("\n Aborted. Run without autonomous mode:");
|
||||
console.log(" claude --dangerously-load-development-channels server:claudemesh\n");
|
||||
process.exit(0);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// --- Banner ---
|
||||
|
||||
function printBanner(name: string, meshSlug: string): void {
|
||||
@@ -174,16 +210,28 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
|
||||
mesh = await pickMesh(config.meshes);
|
||||
}
|
||||
|
||||
// 3. Set display name. Uses existing member identity — the broker
|
||||
// creates a separate presence row per session (sessionId + pid)
|
||||
// and stores the per-session displayName override.
|
||||
// 3. Session identity. The WS client auto-generates a per-session
|
||||
// ephemeral keypair on connect (sent in hello as sessionPubkey).
|
||||
// We just set the display name via env var.
|
||||
const displayName = args.name ?? `${hostname()}-${process.pid}`;
|
||||
|
||||
// 4. Write session config to tmpdir (same mesh, same keypair).
|
||||
// Clean up orphaned tmpdirs from crashed sessions (older than 1 hour)
|
||||
const tmpBase = tmpdir();
|
||||
try {
|
||||
for (const entry of readdirSync(tmpBase)) {
|
||||
if (!entry.startsWith("claudemesh-")) continue;
|
||||
const full = join(tmpBase, entry);
|
||||
const age = Date.now() - statSync(full).mtimeMs;
|
||||
if (age > 3600_000) rmSync(full, { recursive: true, force: true });
|
||||
}
|
||||
} catch { /* best effort */ }
|
||||
|
||||
// 4. Write session config to tmpdir (isolates mesh selection).
|
||||
const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-"));
|
||||
const sessionConfig: Config = {
|
||||
version: 1,
|
||||
meshes: [mesh],
|
||||
displayName,
|
||||
};
|
||||
writeFileSync(
|
||||
join(tmpDir, "config.json"),
|
||||
@@ -191,16 +239,22 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
|
||||
"utf-8",
|
||||
);
|
||||
|
||||
// 5. Banner.
|
||||
if (!args.quiet) printBanner(displayName, mesh.slug);
|
||||
// 5. Banner + permission confirmation.
|
||||
if (!args.quiet) {
|
||||
printBanner(displayName, mesh.slug);
|
||||
// Auto-permissions confirmation — needed for autonomous peer messaging.
|
||||
if (!args.skipPermConfirm) {
|
||||
await confirmPermissions();
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Spawn claude with ephemeral config + dev channel + display name.
|
||||
// Strip any user-supplied --dangerously-load-development-channels
|
||||
// to avoid duplicates — we always inject our own.
|
||||
// 6. Spawn claude with ephemeral config + dev channel + auto-permissions.
|
||||
// Strip any user-supplied --dangerously flags to avoid duplicates.
|
||||
const filtered: string[] = [];
|
||||
for (let i = 0; i < args.claudeArgs.length; i++) {
|
||||
if (args.claudeArgs[i] === "--dangerously-load-development-channels") {
|
||||
i++; // skip the next arg (the channel value) too
|
||||
if (args.claudeArgs[i] === "--dangerously-load-development-channels"
|
||||
|| args.claudeArgs[i] === "--dangerously-skip-permissions") {
|
||||
if (args.claudeArgs[i] === "--dangerously-load-development-channels") i++;
|
||||
continue;
|
||||
}
|
||||
filtered.push(args.claudeArgs[i]!);
|
||||
@@ -208,6 +262,7 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
|
||||
const claudeArgs = [
|
||||
"--dangerously-load-development-channels",
|
||||
"server:claudemesh",
|
||||
"--dangerously-skip-permissions",
|
||||
...filtered,
|
||||
];
|
||||
|
||||
|
||||
@@ -98,6 +98,24 @@ async function resolveClient(to: string): Promise<{
|
||||
};
|
||||
}
|
||||
|
||||
// Peer name cache to avoid calling listPeers on every incoming push
|
||||
const peerNameCache = new Map<string, string>();
|
||||
let peerNameCacheAge = 0;
|
||||
const CACHE_TTL_MS = 30_000;
|
||||
|
||||
async function resolvePeerName(client: BrokerClient, pubkey: string): Promise<string> {
|
||||
const now = Date.now();
|
||||
if (now - peerNameCacheAge > CACHE_TTL_MS) {
|
||||
peerNameCache.clear();
|
||||
try {
|
||||
const peers = await client.listPeers();
|
||||
for (const p of peers) peerNameCache.set(p.pubkey, p.displayName);
|
||||
} catch { /* best effort */ }
|
||||
peerNameCacheAge = now;
|
||||
}
|
||||
return peerNameCache.get(pubkey) ?? `peer-${pubkey.slice(0, 8)}`;
|
||||
}
|
||||
|
||||
function decryptFailedWarning(senderPubkey: string): string {
|
||||
const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender";
|
||||
return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`;
|
||||
@@ -122,7 +140,7 @@ export async function startMcpServer(): Promise<void> {
|
||||
|
||||
IMPORTANT: When you receive a <channel source="claudemesh" ...> message, RESPOND IMMEDIATELY. Do not wait until your current task is finished. Pause what you are doing, reply to the message using send_message, then resume your work. Treat incoming peer messages like a coworker tapping you on the shoulder — answer right away, even if you're in the middle of something.
|
||||
|
||||
Read the from_id, from_name, mesh_slug, and priority attributes to understand context. Reply by calling send_message with the same target (for direct messages the from_id is the sender's pubkey).
|
||||
Read the from_id, from_name, mesh_slug, and priority attributes to understand context. Reply by calling send_message with to set to the from_name (display name) of the sender.
|
||||
|
||||
Available tools:
|
||||
- list_peers: see joined meshes + their connection status
|
||||
@@ -251,8 +269,9 @@ If you have multiple joined meshes, prefix the \`to\` argument of send_message w
|
||||
for (const client of allClients()) {
|
||||
client.onPush(async (msg) => {
|
||||
const fromPubkey = msg.senderPubkey || "";
|
||||
// Resolve sender's display name from the cached peer list.
|
||||
const fromName = fromPubkey
|
||||
? `peer-${fromPubkey.slice(0, 8)}`
|
||||
? await resolvePeerName(client, fromPubkey)
|
||||
: "unknown";
|
||||
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
|
||||
try {
|
||||
|
||||
@@ -31,6 +31,7 @@ export interface JoinedMesh {
|
||||
export interface Config {
|
||||
version: 1;
|
||||
meshes: JoinedMesh[];
|
||||
displayName?: string; // per-session override, written by `claudemesh launch --name`
|
||||
}
|
||||
|
||||
const CONFIG_DIR = env.CLAUDEMESH_CONFIG_DIR ?? join(homedir(), ".claudemesh");
|
||||
@@ -46,7 +47,7 @@ export function loadConfig(): Config {
|
||||
if (!parsed || !Array.isArray(parsed.meshes)) {
|
||||
return { version: 1, meshes: [] };
|
||||
}
|
||||
return { version: 1, meshes: parsed.meshes };
|
||||
return { version: 1, meshes: parsed.meshes, displayName: parsed.displayName };
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Failed to load ${CONFIG_PATH}: ${e instanceof Error ? e.message : String(e)}`,
|
||||
|
||||
@@ -21,6 +21,7 @@ import {
|
||||
isDirectTarget,
|
||||
} from "../crypto/envelope";
|
||||
import { signHello } from "../crypto/hello-sig";
|
||||
import { generateKeypair } from "../crypto/keypair";
|
||||
|
||||
export type Priority = "now" | "next" | "low";
|
||||
export type ConnStatus = "connecting" | "open" | "closed" | "reconnecting";
|
||||
@@ -74,6 +75,8 @@ export class BrokerClient {
|
||||
private pushHandlers = new Set<PushHandler>();
|
||||
private pushBuffer: InboundPush[] = [];
|
||||
private listPeersResolvers: Array<(peers: PeerInfo[]) => void> = [];
|
||||
private sessionPubkey: string | null = null;
|
||||
private sessionSecretKey: string | null = null;
|
||||
private closed = false;
|
||||
private reconnectAttempt = 0;
|
||||
private helloTimer: NodeJS.Timeout | null = null;
|
||||
@@ -83,6 +86,7 @@ export class BrokerClient {
|
||||
private mesh: JoinedMesh,
|
||||
private opts: {
|
||||
onStatusChange?: (status: ConnStatus) => void;
|
||||
displayName?: string;
|
||||
debug?: boolean;
|
||||
} = {},
|
||||
) {}
|
||||
@@ -109,8 +113,15 @@ export class BrokerClient {
|
||||
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
const onOpen = async (): Promise<void> => {
|
||||
this.debug("ws open → signing + sending hello");
|
||||
this.debug("ws open → generating session keypair + signing hello");
|
||||
try {
|
||||
// Only generate session keypair on first connect, not reconnects
|
||||
if (!this.sessionPubkey) {
|
||||
const sessionKP = await generateKeypair();
|
||||
this.sessionPubkey = sessionKP.publicKey;
|
||||
this.sessionSecretKey = sessionKP.secretKey;
|
||||
}
|
||||
|
||||
const { timestamp, signature } = await signHello(
|
||||
this.mesh.meshId,
|
||||
this.mesh.memberId,
|
||||
@@ -123,7 +134,8 @@ export class BrokerClient {
|
||||
meshId: this.mesh.meshId,
|
||||
memberId: this.mesh.memberId,
|
||||
pubkey: this.mesh.pubkey,
|
||||
displayName: process.env.CLAUDEMESH_DISPLAY_NAME || undefined,
|
||||
sessionPubkey: this.sessionPubkey,
|
||||
displayName: process.env.CLAUDEMESH_DISPLAY_NAME || this.opts.displayName || undefined,
|
||||
sessionId: `${process.pid}-${Date.now()}`,
|
||||
pid: process.pid,
|
||||
cwd: process.cwd(),
|
||||
@@ -203,7 +215,7 @@ export class BrokerClient {
|
||||
const env = await encryptDirect(
|
||||
message,
|
||||
targetSpec,
|
||||
this.mesh.secretKey,
|
||||
this.sessionSecretKey ?? this.mesh.secretKey,
|
||||
);
|
||||
nonce = env.nonce;
|
||||
ciphertext = env.ciphertext;
|
||||
@@ -349,7 +361,7 @@ export class BrokerClient {
|
||||
plaintext = await decryptDirect(
|
||||
{ nonce, ciphertext },
|
||||
senderPubkey,
|
||||
this.mesh.secretKey,
|
||||
this.sessionSecretKey ?? this.mesh.secretKey,
|
||||
);
|
||||
}
|
||||
// Legacy/broadcast path: no senderPubkey means the message
|
||||
@@ -366,6 +378,19 @@ export class BrokerClient {
|
||||
plaintext = null;
|
||||
}
|
||||
}
|
||||
// Fallback: if direct decrypt failed, try plaintext base64 decode.
|
||||
// This handles broadcasts and key mismatches gracefully.
|
||||
if (plaintext === null && ciphertext) {
|
||||
try {
|
||||
const decoded = Buffer.from(ciphertext, "base64").toString("utf-8");
|
||||
// Sanity check: valid UTF-8 text (not binary garbage)
|
||||
if (/^[\x20-\x7E\s\u00A0-\uFFFF]*$/.test(decoded) && decoded.length > 0) {
|
||||
plaintext = decoded;
|
||||
}
|
||||
} catch {
|
||||
plaintext = null;
|
||||
}
|
||||
}
|
||||
const push: InboundPush = {
|
||||
messageId: String(msg.messageId ?? ""),
|
||||
meshId: String(msg.meshId ?? ""),
|
||||
|
||||
@@ -11,12 +11,13 @@ import type { Config, JoinedMesh } from "../state/config";
|
||||
import { env } from "../env";
|
||||
|
||||
const clients = new Map<string, BrokerClient>();
|
||||
let configDisplayName: string | undefined;
|
||||
|
||||
/** Ensure a BrokerClient exists + is connecting/open for this mesh. */
|
||||
export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
|
||||
const existing = clients.get(mesh.meshId);
|
||||
if (existing) return existing;
|
||||
const client = new BrokerClient(mesh, { debug: env.CLAUDEMESH_DEBUG });
|
||||
const client = new BrokerClient(mesh, { debug: env.CLAUDEMESH_DEBUG, displayName: configDisplayName });
|
||||
clients.set(mesh.meshId, client);
|
||||
try {
|
||||
await client.connect();
|
||||
@@ -29,6 +30,7 @@ export async function ensureClient(mesh: JoinedMesh): Promise<BrokerClient> {
|
||||
|
||||
/** Start clients for every joined mesh. Called once on MCP server start. */
|
||||
export async function startClients(config: Config): Promise<void> {
|
||||
configDisplayName = config.displayName;
|
||||
await Promise.allSettled(config.meshes.map(ensureClient));
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE "mesh"."presence" ADD COLUMN "session_pubkey" text;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE "mesh"."message_queue" ADD COLUMN "sender_session_pubkey" text;
|
||||
@@ -192,6 +192,7 @@ export const presence = meshSchema.table("presence", {
|
||||
.references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" })
|
||||
.notNull(),
|
||||
sessionId: text().notNull(),
|
||||
sessionPubkey: text(),
|
||||
displayName: text(),
|
||||
pid: integer().notNull(),
|
||||
cwd: text().notNull(),
|
||||
@@ -221,6 +222,7 @@ export const messageQueue = meshSchema.table("message_queue", {
|
||||
senderMemberId: text()
|
||||
.references(() => meshMember.id, { onDelete: "cascade", onUpdate: "cascade" })
|
||||
.notNull(),
|
||||
senderSessionPubkey: text(),
|
||||
targetSpec: text().notNull(),
|
||||
priority: messagePriorityEnum().notNull().default("next"),
|
||||
nonce: text().notNull(),
|
||||
|
||||
Reference in New Issue
Block a user