fix: queue TTL + per-member send rate limit + size cap + no-recipient reject + ack.error
Broker (all need redeploy): - sweepOrphanMessages: DELETE undelivered message_queue rows older than 7 days; hourly sweep. Stops unbounded growth when a sender typos a name (queued forever, never claimed). - Per-member send rate limit: TokenBucket(60/min, burst 10) keyed on memberId so reconnecting can't bypass. Surfaces as queued=false, error='rate_limit: ...'. - Pre-flight size cap: reject at handleSend if nonce+ciphertext+ targetSpec exceeds env.MAX_MESSAGE_BYTES with a clear error instead of silent WSS frame-level kill. - No-recipient reject: for direct sends, check any matching peer is connected BEFORE queueing. Kills the self-send silent drop (sending to your own pubkey when you only have one session connected) and typo-to-offline-peer silent drops. - WSAckMessage.error field added for structured failure reasons. CLI: - ws-client ack handler reads msg.queued and msg.error; surfaces rate_limit / too_large / no_recipient to callers instead of returning ok:true with a dummy messageId. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -293,6 +293,29 @@ export async function sweepStalePresences(): Promise<void> {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sweep undelivered message_queue rows older than 7 days.
|
||||||
|
*
|
||||||
|
* Messages sent to non-matching targetSpecs (e.g. typos, peer disconnected
|
||||||
|
* before claim) would otherwise sit in delivered_at=NULL forever — unbounded
|
||||||
|
* growth. 7d matches invite expiry, so any legitimately held message is
|
||||||
|
* already stale by then.
|
||||||
|
*
|
||||||
|
* Returns the number of rows deleted so the caller can log + meter.
|
||||||
|
*/
|
||||||
|
export async function sweepOrphanMessages(): Promise<number> {
|
||||||
|
const cutoff = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000);
|
||||||
|
const result = await db.execute(sql`
|
||||||
|
DELETE FROM mesh.message_queue
|
||||||
|
WHERE delivered_at IS NULL
|
||||||
|
AND created_at < ${cutoff}
|
||||||
|
RETURNING id
|
||||||
|
`);
|
||||||
|
const rows = (result as unknown as { rows?: unknown[]; length?: number }).rows ?? result;
|
||||||
|
const count = Array.isArray(rows) ? rows.length : 0;
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
/** Sweep expired pending_status entries. */
|
/** Sweep expired pending_status entries. */
|
||||||
export async function sweepPendingStatuses(): Promise<void> {
|
export async function sweepPendingStatuses(): Promise<void> {
|
||||||
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
|
const cutoff = new Date(Date.now() - PENDING_TTL_MS);
|
||||||
@@ -1667,6 +1690,12 @@ export function startSweepers(): void {
|
|||||||
console.error("[broker] stale presence sweep:", e),
|
console.error("[broker] stale presence sweep:", e),
|
||||||
);
|
);
|
||||||
}, 30_000);
|
}, 30_000);
|
||||||
|
// Orphan-message sweep every hour; cheap, rows are all >7d at deletion time.
|
||||||
|
setInterval(() => {
|
||||||
|
sweepOrphanMessages()
|
||||||
|
.then((n) => { if (n > 0) console.log(`[broker] orphan msgs swept: ${n}`); })
|
||||||
|
.catch((e) => console.error("[broker] orphan msg sweep:", e));
|
||||||
|
}, 60 * 60_000).unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Stop background sweepers and mark all active presences disconnected. */
|
/** Stop background sweepers and mark all active presences disconnected. */
|
||||||
|
|||||||
@@ -483,6 +483,14 @@ const hookRateLimit = new TokenBucket(
|
|||||||
env.HOOK_RATE_LIMIT_PER_MIN,
|
env.HOOK_RATE_LIMIT_PER_MIN,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Per-member send rate limit. Protects the mesh from a runaway peer
|
||||||
|
* dumping messages. Bucket is 60 msgs/min with a burst of 10 — generous
|
||||||
|
* for conversational use, tight enough that a loop bug surfaces in seconds.
|
||||||
|
* Configurable via env in a later pass.
|
||||||
|
*/
|
||||||
|
const sendRateLimit = new TokenBucket(60, 10);
|
||||||
|
|
||||||
function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
function sendToPeer(presenceId: string, msg: WSServerMessage): void {
|
||||||
const conn = connections.get(presenceId);
|
const conn = connections.get(presenceId);
|
||||||
if (!conn) return;
|
if (!conn) return;
|
||||||
@@ -1792,6 +1800,75 @@ async function handleSend(
|
|||||||
msg: Extract<WSClientMessage, { type: "send" }>,
|
msg: Extract<WSClientMessage, { type: "send" }>,
|
||||||
subtype?: "reminder",
|
subtype?: "reminder",
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
|
// Per-member rate limit (60/min, burst 10). Runaway peer → graceful ack
|
||||||
|
// failure instead of queue explosion. Uses member (not session) key so a
|
||||||
|
// peer can't dodge by reconnecting.
|
||||||
|
if (!sendRateLimit.take(conn.memberId)) {
|
||||||
|
metrics.messagesRejectedTotal.inc({ reason: "rate_limit" });
|
||||||
|
const errAck: WSServerMessage = {
|
||||||
|
type: "ack",
|
||||||
|
id: msg.id ?? "",
|
||||||
|
messageId: "",
|
||||||
|
queued: false,
|
||||||
|
error: "rate_limit: max 60 msg/min (burst 10) — slow down",
|
||||||
|
};
|
||||||
|
conn.ws.send(JSON.stringify(errAck));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Size cap — ws.maxPayload catches giants at the frame level, but we also
|
||||||
|
// reject verbose nonce+ciphertext combinations above env.MAX_MESSAGE_BYTES
|
||||||
|
// so clients get a clear error instead of a silent socket kill.
|
||||||
|
const approxBytes =
|
||||||
|
(msg.ciphertext?.length ?? 0) + (msg.nonce?.length ?? 0) + (msg.targetSpec?.length ?? 0);
|
||||||
|
if (approxBytes > env.MAX_MESSAGE_BYTES) {
|
||||||
|
metrics.messagesRejectedTotal.inc({ reason: "too_large" });
|
||||||
|
const errAck: WSServerMessage = {
|
||||||
|
type: "ack",
|
||||||
|
id: msg.id ?? "",
|
||||||
|
messageId: "",
|
||||||
|
queued: false,
|
||||||
|
error: `payload too large: ${approxBytes} bytes > MAX_MESSAGE_BYTES=${env.MAX_MESSAGE_BYTES}`,
|
||||||
|
};
|
||||||
|
conn.ws.send(JSON.stringify(errAck));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pre-flight: for direct sends (not @group, not *), verify at least one
|
||||||
|
// matching connected peer exists BEFORE queueing. Prevents silent drops
|
||||||
|
// when a user sends to a typo, their own pubkey with no other session,
|
||||||
|
// or a peer who has disconnected. The CLI's resolveClient already guards
|
||||||
|
// name-based targets; this catches raw-pubkey and CLI-bypassing clients.
|
||||||
|
const isGroupTargetEarly = msg.targetSpec.startsWith("@");
|
||||||
|
const isBroadcastEarly =
|
||||||
|
msg.targetSpec === "*" ||
|
||||||
|
(isGroupTargetEarly && msg.targetSpec === "@all");
|
||||||
|
const isDirectEarly = !isGroupTargetEarly && !isBroadcastEarly && msg.targetSpec !== "*";
|
||||||
|
if (isDirectEarly) {
|
||||||
|
let hasRecipient = false;
|
||||||
|
for (const [pid, peer] of connections) {
|
||||||
|
if (peer.meshId !== conn.meshId) continue;
|
||||||
|
if (peer.ws === conn.ws) continue;
|
||||||
|
if (peer.memberPubkey === msg.targetSpec || peer.sessionPubkey === msg.targetSpec) {
|
||||||
|
hasRecipient = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
void pid;
|
||||||
|
}
|
||||||
|
if (!hasRecipient) {
|
||||||
|
metrics.messagesRejectedTotal.inc({ reason: "no_recipient" });
|
||||||
|
const errAck: WSServerMessage = {
|
||||||
|
type: "ack",
|
||||||
|
id: msg.id ?? "",
|
||||||
|
messageId: "",
|
||||||
|
queued: false,
|
||||||
|
error: `no connected peer for target (not online, or targetSpec is your own key without another session)`,
|
||||||
|
};
|
||||||
|
conn.ws.send(JSON.stringify(errAck));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const messageId = await queueMessage({
|
const messageId = await queueMessage({
|
||||||
meshId: conn.meshId,
|
meshId: conn.meshId,
|
||||||
senderMemberId: conn.memberId,
|
senderMemberId: conn.memberId,
|
||||||
|
|||||||
@@ -206,6 +206,8 @@ export interface WSAckMessage {
|
|||||||
id: string; // echoes client-side correlation id
|
id: string; // echoes client-side correlation id
|
||||||
messageId: string;
|
messageId: string;
|
||||||
queued: boolean;
|
queued: boolean;
|
||||||
|
/** Populated when queued=false to explain why (rate_limit, too_large, etc.). */
|
||||||
|
error?: string;
|
||||||
_reqId?: string;
|
_reqId?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "claudemesh-cli",
|
"name": "claudemesh-cli",
|
||||||
"version": "1.0.0-alpha.34",
|
"version": "1.0.0-alpha.35",
|
||||||
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
|
"description": "Peer mesh for Claude Code sessions — CLI + MCP server.",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"claude-code",
|
"claude-code",
|
||||||
|
|||||||
@@ -1618,10 +1618,13 @@ export class BrokerClient {
|
|||||||
if (msg.type === "ack") {
|
if (msg.type === "ack") {
|
||||||
const pending = this.pendingSends.get(String(msg.id ?? ""));
|
const pending = this.pendingSends.get(String(msg.id ?? ""));
|
||||||
if (pending) {
|
if (pending) {
|
||||||
pending.resolve({
|
const queued = msg.queued !== false;
|
||||||
ok: true,
|
const errStr = typeof msg.error === "string" ? msg.error : undefined;
|
||||||
messageId: String(msg.messageId ?? ""),
|
pending.resolve(
|
||||||
});
|
queued
|
||||||
|
? { ok: true, messageId: String(msg.messageId ?? "") }
|
||||||
|
: { ok: false, error: errStr ?? "broker rejected send" },
|
||||||
|
);
|
||||||
this.pendingSends.delete(pending.id);
|
this.pendingSends.delete(pending.id);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
|||||||
Reference in New Issue
Block a user