Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
663f800b4b | ||
|
|
2557235c68 |
@@ -265,6 +265,23 @@ export async function refreshQueueDepth(): Promise<void> {
|
|||||||
metrics.queueDepth.set(Number(row?.n ?? 0));
|
metrics.queueDepth.set(Number(row?.n ?? 0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sweep stale presences: mark as disconnected if last_ping_at is older
|
||||||
|
* than 90s (3 missed pings at the 30s interval = dead session).
|
||||||
|
*/
|
||||||
|
export async function sweepStalePresences(): Promise<void> {
|
||||||
|
const cutoff = new Date(Date.now() - 90_000); // 3 missed pings
|
||||||
|
await db
|
||||||
|
.update(presence)
|
||||||
|
.set({ disconnectedAt: new Date() })
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
isNull(presence.disconnectedAt),
|
||||||
|
lt(presence.lastPingAt, cutoff),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
/** Sweep expired pending_status entries. */
|
/** Sweep expired pending_status entries. */
|
||||||
export async function sweepPendingStatuses(): Promise<void> {
|
export async function sweepPendingStatuses(): Promise<void> {
|
||||||
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
|
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
|
||||||
@@ -475,6 +492,7 @@ export async function drainForMember(
|
|||||||
memberPubkey: string,
|
memberPubkey: string,
|
||||||
status: PeerStatus,
|
status: PeerStatus,
|
||||||
sessionPubkey?: string,
|
sessionPubkey?: string,
|
||||||
|
excludeSenderSessionPubkey?: string,
|
||||||
): Promise<
|
): Promise<
|
||||||
Array<{
|
Array<{
|
||||||
id: string;
|
id: string;
|
||||||
@@ -516,6 +534,7 @@ export async function drainForMember(
|
|||||||
AND delivered_at IS NULL
|
AND delivered_at IS NULL
|
||||||
AND priority::text IN (${priorityList})
|
AND priority::text IN (${priorityList})
|
||||||
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``})
|
AND (target_spec = ${memberPubkey} OR target_spec = '*'${sessionPubkey ? sql` OR target_spec = ${sessionPubkey}` : sql``})
|
||||||
|
${excludeSenderSessionPubkey ? sql`AND (sender_session_pubkey IS NULL OR sender_session_pubkey != ${excludeSenderSessionPubkey})` : sql``}
|
||||||
ORDER BY created_at ASC, id ASC
|
ORDER BY created_at ASC, id ASC
|
||||||
FOR UPDATE SKIP LOCKED
|
FOR UPDATE SKIP LOCKED
|
||||||
)
|
)
|
||||||
@@ -553,6 +572,7 @@ export async function drainForMember(
|
|||||||
|
|
||||||
let ttlTimer: ReturnType<typeof setInterval> | null = null;
|
let ttlTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
let pendingTimer: ReturnType<typeof setInterval> | null = null;
|
let pendingTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let staleTimer: ReturnType<typeof setInterval> | null = null;
|
||||||
|
|
||||||
/** Start background sweepers. Idempotent. */
|
/** Start background sweepers. Idempotent. */
|
||||||
export function startSweepers(): void {
|
export function startSweepers(): void {
|
||||||
@@ -565,14 +585,21 @@ export function startSweepers(): void {
|
|||||||
console.error("[broker] pending sweep:", e),
|
console.error("[broker] pending sweep:", e),
|
||||||
);
|
);
|
||||||
}, PENDING_SWEEP_INTERVAL_MS);
|
}, PENDING_SWEEP_INTERVAL_MS);
|
||||||
|
staleTimer = setInterval(() => {
|
||||||
|
sweepStalePresences().catch((e) =>
|
||||||
|
console.error("[broker] stale presence sweep:", e),
|
||||||
|
);
|
||||||
|
}, 30_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Stop background sweepers and mark all active presences disconnected. */
|
/** Stop background sweepers and mark all active presences disconnected. */
|
||||||
export async function stopSweepers(): Promise<void> {
|
export async function stopSweepers(): Promise<void> {
|
||||||
if (ttlTimer) clearInterval(ttlTimer);
|
if (ttlTimer) clearInterval(ttlTimer);
|
||||||
if (pendingTimer) clearInterval(pendingTimer);
|
if (pendingTimer) clearInterval(pendingTimer);
|
||||||
|
if (staleTimer) clearInterval(staleTimer);
|
||||||
ttlTimer = null;
|
ttlTimer = null;
|
||||||
pendingTimer = null;
|
pendingTimer = null;
|
||||||
|
staleTimer = null;
|
||||||
await db
|
await db
|
||||||
.update(presence)
|
.update(presence)
|
||||||
.set({ disconnectedAt: new Date() })
|
.set({ disconnectedAt: new Date() })
|
||||||
|
|||||||
@@ -81,7 +81,10 @@ function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
async function maybePushQueuedMessages(
|
||||||
|
presenceId: string,
|
||||||
|
excludeSenderSessionPubkey?: string,
|
||||||
|
): Promise<void> {
|
||||||
const conn = connections.get(presenceId);
|
const conn = connections.get(presenceId);
|
||||||
if (!conn) return;
|
if (!conn) return;
|
||||||
const status = await refreshStatusFromJsonl(
|
const status = await refreshStatusFromJsonl(
|
||||||
@@ -95,6 +98,7 @@ async function maybePushQueuedMessages(presenceId: string): Promise<void> {
|
|||||||
conn.memberPubkey,
|
conn.memberPubkey,
|
||||||
status,
|
status,
|
||||||
conn.sessionPubkey ?? undefined,
|
conn.sessionPubkey ?? undefined,
|
||||||
|
excludeSenderSessionPubkey,
|
||||||
);
|
);
|
||||||
for (const m of messages) {
|
for (const m of messages) {
|
||||||
const push: WSPushMessage = {
|
const push: WSPushMessage = {
|
||||||
@@ -452,14 +456,21 @@ async function handleSend(
|
|||||||
};
|
};
|
||||||
conn.ws.send(JSON.stringify(ack));
|
conn.ws.send(JSON.stringify(ack));
|
||||||
|
|
||||||
// Fan-out over connected peers in the same mesh.
|
// Find sender's presenceId to exclude from fan-out.
|
||||||
|
let senderPresenceId: string | undefined;
|
||||||
for (const [pid, peer] of connections) {
|
for (const [pid, peer] of connections) {
|
||||||
|
if (peer.ws === conn.ws) { senderPresenceId = pid; break; }
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fan-out over connected peers in the same mesh — skip sender.
|
||||||
|
for (const [pid, peer] of connections) {
|
||||||
|
if (pid === senderPresenceId) continue;
|
||||||
if (peer.meshId !== conn.meshId) continue;
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
if (msg.targetSpec !== "*"
|
if (msg.targetSpec !== "*"
|
||||||
&& peer.memberPubkey !== msg.targetSpec
|
&& peer.memberPubkey !== msg.targetSpec
|
||||||
&& peer.sessionPubkey !== msg.targetSpec)
|
&& peer.sessionPubkey !== msg.targetSpec)
|
||||||
continue;
|
continue;
|
||||||
void maybePushQueuedMessages(pid);
|
void maybePushQueuedMessages(pid, conn.sessionPubkey ?? undefined);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "claudemesh-cli",
|
"name": "claudemesh-cli",
|
||||||
"version": "0.1.14",
|
"version": "0.1.16",
|
||||||
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
|
"description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"claude-code",
|
"claude-code",
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
import { mkdtempSync, writeFileSync, rmSync } from "node:fs";
|
import { mkdtempSync, writeFileSync, rmSync, readdirSync, statSync } from "node:fs";
|
||||||
import { tmpdir, hostname } from "node:os";
|
import { tmpdir, hostname } from "node:os";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
import { createInterface } from "node:readline";
|
import { createInterface } from "node:readline";
|
||||||
@@ -215,6 +215,17 @@ export async function runLaunch(extraArgs: string[]): Promise<void> {
|
|||||||
// We just set the display name via env var.
|
// We just set the display name via env var.
|
||||||
const displayName = args.name ?? `${hostname()}-${process.pid}`;
|
const displayName = args.name ?? `${hostname()}-${process.pid}`;
|
||||||
|
|
||||||
|
// Clean up orphaned tmpdirs from crashed sessions (older than 1 hour)
|
||||||
|
const tmpBase = tmpdir();
|
||||||
|
try {
|
||||||
|
for (const entry of readdirSync(tmpBase)) {
|
||||||
|
if (!entry.startsWith("claudemesh-")) continue;
|
||||||
|
const full = join(tmpBase, entry);
|
||||||
|
const age = Date.now() - statSync(full).mtimeMs;
|
||||||
|
if (age > 3600_000) rmSync(full, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
} catch { /* best effort */ }
|
||||||
|
|
||||||
// 4. Write session config to tmpdir (isolates mesh selection).
|
// 4. Write session config to tmpdir (isolates mesh selection).
|
||||||
const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-"));
|
const tmpDir = mkdtempSync(join(tmpdir(), "claudemesh-"));
|
||||||
const sessionConfig: Config = {
|
const sessionConfig: Config = {
|
||||||
|
|||||||
@@ -98,6 +98,24 @@ async function resolveClient(to: string): Promise<{
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Peer name cache to avoid calling listPeers on every incoming push
|
||||||
|
const peerNameCache = new Map<string, string>();
|
||||||
|
let peerNameCacheAge = 0;
|
||||||
|
const CACHE_TTL_MS = 30_000;
|
||||||
|
|
||||||
|
async function resolvePeerName(client: BrokerClient, pubkey: string): Promise<string> {
|
||||||
|
const now = Date.now();
|
||||||
|
if (now - peerNameCacheAge > CACHE_TTL_MS) {
|
||||||
|
peerNameCache.clear();
|
||||||
|
try {
|
||||||
|
const peers = await client.listPeers();
|
||||||
|
for (const p of peers) peerNameCache.set(p.pubkey, p.displayName);
|
||||||
|
} catch { /* best effort */ }
|
||||||
|
peerNameCacheAge = now;
|
||||||
|
}
|
||||||
|
return peerNameCache.get(pubkey) ?? `peer-${pubkey.slice(0, 8)}`;
|
||||||
|
}
|
||||||
|
|
||||||
function decryptFailedWarning(senderPubkey: string): string {
|
function decryptFailedWarning(senderPubkey: string): string {
|
||||||
const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender";
|
const who = senderPubkey ? senderPubkey.slice(0, 12) + "…" : "unknown sender";
|
||||||
return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`;
|
return `⚠ message from ${who} failed to decrypt (tampered or wrong keypair)`;
|
||||||
@@ -251,17 +269,10 @@ If you have multiple joined meshes, prefix the \`to\` argument of send_message w
|
|||||||
for (const client of allClients()) {
|
for (const client of allClients()) {
|
||||||
client.onPush(async (msg) => {
|
client.onPush(async (msg) => {
|
||||||
const fromPubkey = msg.senderPubkey || "";
|
const fromPubkey = msg.senderPubkey || "";
|
||||||
// Resolve sender's display name from the peer list.
|
// Resolve sender's display name from the cached peer list.
|
||||||
let fromName = fromPubkey
|
const fromName = fromPubkey
|
||||||
? `peer-${fromPubkey.slice(0, 8)}`
|
? await resolvePeerName(client, fromPubkey)
|
||||||
: "unknown";
|
: "unknown";
|
||||||
try {
|
|
||||||
const peers = await client.listPeers();
|
|
||||||
const match = peers.find((p) => p.pubkey === fromPubkey);
|
|
||||||
if (match) fromName = match.displayName;
|
|
||||||
} catch {
|
|
||||||
/* best effort — fall back to truncated pubkey */
|
|
||||||
}
|
|
||||||
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
|
const content = msg.plaintext ?? decryptFailedWarning(fromPubkey);
|
||||||
try {
|
try {
|
||||||
await server.notification({
|
await server.notification({
|
||||||
|
|||||||
@@ -115,10 +115,12 @@ export class BrokerClient {
|
|||||||
const onOpen = async (): Promise<void> => {
|
const onOpen = async (): Promise<void> => {
|
||||||
this.debug("ws open → generating session keypair + signing hello");
|
this.debug("ws open → generating session keypair + signing hello");
|
||||||
try {
|
try {
|
||||||
// Generate per-session ephemeral keypair for message routing.
|
// Only generate session keypair on first connect, not reconnects
|
||||||
|
if (!this.sessionPubkey) {
|
||||||
const sessionKP = await generateKeypair();
|
const sessionKP = await generateKeypair();
|
||||||
this.sessionPubkey = sessionKP.publicKey;
|
this.sessionPubkey = sessionKP.publicKey;
|
||||||
this.sessionSecretKey = sessionKP.secretKey;
|
this.sessionSecretKey = sessionKP.secretKey;
|
||||||
|
}
|
||||||
|
|
||||||
const { timestamp, signature } = await signHello(
|
const { timestamp, signature } = await signHello(
|
||||||
this.mesh.meshId,
|
this.mesh.meshId,
|
||||||
@@ -376,6 +378,19 @@ export class BrokerClient {
|
|||||||
plaintext = null;
|
plaintext = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Fallback: if direct decrypt failed, try plaintext base64 decode.
|
||||||
|
// This handles broadcasts and key mismatches gracefully.
|
||||||
|
if (plaintext === null && ciphertext) {
|
||||||
|
try {
|
||||||
|
const decoded = Buffer.from(ciphertext, "base64").toString("utf-8");
|
||||||
|
// Sanity check: valid UTF-8 text (not binary garbage)
|
||||||
|
if (/^[\x20-\x7E\s\u00A0-\uFFFF]*$/.test(decoded) && decoded.length > 0) {
|
||||||
|
plaintext = decoded;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
plaintext = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
const push: InboundPush = {
|
const push: InboundPush = {
|
||||||
messageId: String(msg.messageId ?? ""),
|
messageId: String(msg.messageId ?? ""),
|
||||||
meshId: String(msg.meshId ?? ""),
|
meshId: String(msg.meshId ?? ""),
|
||||||
|
|||||||
Reference in New Issue
Block a user