From 1a7a059e754ca35dde7f91c579df1816532cac08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Wed, 15 Apr 2026 14:44:09 +0100 Subject: [PATCH] fix: queue TTL + per-member send rate limit + size cap + no-recipient reject + ack.error Broker (all need redeploy): - sweepOrphanMessages: DELETE undelivered message_queue rows older than 7 days; hourly sweep. Stops unbounded growth when a sender typos a name (queued forever, never claimed). - Per-member send rate limit: TokenBucket(60/min, burst 10) keyed on memberId so reconnecting can't bypass. Surfaces as queued=false, error='rate_limit: ...'. - Pre-flight size cap: reject at handleSend if nonce+ciphertext+ targetSpec exceeds env.MAX_MESSAGE_BYTES with a clear error instead of silent WSS frame-level kill. - No-recipient reject: for direct sends, check any matching peer is connected BEFORE queueing. Kills the self-send silent drop (sending to your own pubkey when you only have one session connected) and typo-to-offline-peer silent drops. - WSAckMessage.error field added for structured failure reasons. CLI: - ws-client ack handler reads msg.queued and msg.error; surfaces rate_limit / too_large / no_recipient to callers instead of returning ok:true with a dummy messageId. Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/src/broker.ts | 29 +++++++++ apps/broker/src/index.ts | 77 +++++++++++++++++++++++ apps/broker/src/types.ts | 2 + apps/cli/package.json | 2 +- apps/cli/src/services/broker/ws-client.ts | 11 ++-- 5 files changed, 116 insertions(+), 5 deletions(-) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 31f66a7..7e2e7d9 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -293,6 +293,29 @@ export async function sweepStalePresences(): Promise { ); } +/** + * Sweep undelivered message_queue rows older than 7 days. + * + * Messages sent to non-matching targetSpecs (e.g. typos, peer disconnected + * before claim) would otherwise sit in delivered_at=NULL forever — unbounded + * growth. 7d matches invite expiry, so any legitimately held message is + * already stale by then. + * + * Returns the number of rows deleted so the caller can log + meter. + */ +export async function sweepOrphanMessages(): Promise { + const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); + const result = await db.execute(sql` + DELETE FROM mesh.message_queue + WHERE delivered_at IS NULL + AND created_at < ${cutoff} + RETURNING id + `); + const rows = (result as unknown as { rows?: unknown[]; length?: number }).rows ?? result; + const count = Array.isArray(rows) ? rows.length : 0; + return count; +} + /** Sweep expired pending_status entries. */ export async function sweepPendingStatuses(): Promise { const cutoff = new Date(Date.now() - PENDING_TTL_MS); @@ -1667,6 +1690,12 @@ export function startSweepers(): void { console.error("[broker] stale presence sweep:", e), ); }, 30_000); + // Orphan-message sweep every hour; cheap, rows are all >7d at deletion time. + setInterval(() => { + sweepOrphanMessages() + .then((n) => { if (n > 0) console.log(`[broker] orphan msgs swept: ${n}`); }) + .catch((e) => console.error("[broker] orphan msg sweep:", e)); + }, 60 * 60_000).unref(); } /** Stop background sweepers and mark all active presences disconnected. */ diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index d8ef8f3..94d06c0 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -483,6 +483,14 @@ const hookRateLimit = new TokenBucket( env.HOOK_RATE_LIMIT_PER_MIN, ); +/** + * Per-member send rate limit. Protects the mesh from a runaway peer + * dumping messages. Bucket is 60 msgs/min with a burst of 10 — generous + * for conversational use, tight enough that a loop bug surfaces in seconds. + * Configurable via env in a later pass. + */ +const sendRateLimit = new TokenBucket(60, 10); + function sendToPeer(presenceId: string, msg: WSServerMessage): void { const conn = connections.get(presenceId); if (!conn) return; @@ -1792,6 +1800,75 @@ async function handleSend( msg: Extract, subtype?: "reminder", ): Promise { + // Per-member rate limit (60/min, burst 10). Runaway peer → graceful ack + // failure instead of queue explosion. Uses member (not session) key so a + // peer can't dodge by reconnecting. + if (!sendRateLimit.take(conn.memberId)) { + metrics.messagesRejectedTotal.inc({ reason: "rate_limit" }); + const errAck: WSServerMessage = { + type: "ack", + id: msg.id ?? "", + messageId: "", + queued: false, + error: "rate_limit: max 60 msg/min (burst 10) — slow down", + }; + conn.ws.send(JSON.stringify(errAck)); + return; + } + + // Size cap — ws.maxPayload catches giants at the frame level, but we also + // reject verbose nonce+ciphertext combinations above env.MAX_MESSAGE_BYTES + // so clients get a clear error instead of a silent socket kill. + const approxBytes = + (msg.ciphertext?.length ?? 0) + (msg.nonce?.length ?? 0) + (msg.targetSpec?.length ?? 0); + if (approxBytes > env.MAX_MESSAGE_BYTES) { + metrics.messagesRejectedTotal.inc({ reason: "too_large" }); + const errAck: WSServerMessage = { + type: "ack", + id: msg.id ?? "", + messageId: "", + queued: false, + error: `payload too large: ${approxBytes} bytes > MAX_MESSAGE_BYTES=${env.MAX_MESSAGE_BYTES}`, + }; + conn.ws.send(JSON.stringify(errAck)); + return; + } + + // Pre-flight: for direct sends (not @group, not *), verify at least one + // matching connected peer exists BEFORE queueing. Prevents silent drops + // when a user sends to a typo, their own pubkey with no other session, + // or a peer who has disconnected. The CLI's resolveClient already guards + // name-based targets; this catches raw-pubkey and CLI-bypassing clients. + const isGroupTargetEarly = msg.targetSpec.startsWith("@"); + const isBroadcastEarly = + msg.targetSpec === "*" || + (isGroupTargetEarly && msg.targetSpec === "@all"); + const isDirectEarly = !isGroupTargetEarly && !isBroadcastEarly && msg.targetSpec !== "*"; + if (isDirectEarly) { + let hasRecipient = false; + for (const [pid, peer] of connections) { + if (peer.meshId !== conn.meshId) continue; + if (peer.ws === conn.ws) continue; + if (peer.memberPubkey === msg.targetSpec || peer.sessionPubkey === msg.targetSpec) { + hasRecipient = true; + break; + } + void pid; + } + if (!hasRecipient) { + metrics.messagesRejectedTotal.inc({ reason: "no_recipient" }); + const errAck: WSServerMessage = { + type: "ack", + id: msg.id ?? "", + messageId: "", + queued: false, + error: `no connected peer for target (not online, or targetSpec is your own key without another session)`, + }; + conn.ws.send(JSON.stringify(errAck)); + return; + } + } + const messageId = await queueMessage({ meshId: conn.meshId, senderMemberId: conn.memberId, diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 8523edc..14aa96d 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -206,6 +206,8 @@ export interface WSAckMessage { id: string; // echoes client-side correlation id messageId: string; queued: boolean; + /** Populated when queued=false to explain why (rate_limit, too_large, etc.). */ + error?: string; _reqId?: string; } diff --git a/apps/cli/package.json b/apps/cli/package.json index d2bb72e..243cdbf 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "1.0.0-alpha.34", + "version": "1.0.0-alpha.35", "description": "Peer mesh for Claude Code sessions — CLI + MCP server.", "keywords": [ "claude-code", diff --git a/apps/cli/src/services/broker/ws-client.ts b/apps/cli/src/services/broker/ws-client.ts index c6dadde..c8018de 100644 --- a/apps/cli/src/services/broker/ws-client.ts +++ b/apps/cli/src/services/broker/ws-client.ts @@ -1618,10 +1618,13 @@ export class BrokerClient { if (msg.type === "ack") { const pending = this.pendingSends.get(String(msg.id ?? "")); if (pending) { - pending.resolve({ - ok: true, - messageId: String(msg.messageId ?? ""), - }); + const queued = msg.queued !== false; + const errStr = typeof msg.error === "string" ? msg.error : undefined; + pending.resolve( + queued + ? { ok: true, messageId: String(msg.messageId ?? "") } + : { ok: false, error: errStr ?? "broker rejected send" }, + ); this.pendingSends.delete(pending.id); } return;