feat: scheduled messages — schedule_reminder, send_later, list_scheduled, cancel_scheduled
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

- Broker: schedule/list_scheduled/cancel_scheduled WS message types + in-memory delivery
- Client: scheduleMessage(), listScheduled(), cancelScheduled() with resolver Map pattern
- MCP: schedule_reminder, send_later, list_scheduled, cancel_scheduled tools
- CLI: claudemesh remind <msg> --in 2h | --at 15:00 | list | cancel <id>
- Types: WSScheduleMessage, WSScheduledAckMessage, WSScheduledListMessage, WSCancelScheduledAckMessage

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 14:53:42 +01:00
parent 59848f0d3e
commit e76ade64d2
18 changed files with 1152 additions and 13 deletions

View File

@@ -102,6 +102,19 @@ const connectionsPerMesh = new Map<string, number>();
// Stream subscriptions: "meshId:streamName" → Set of presenceIds
const streamSubscriptions = new Map<string, Set<string>>();
// Scheduled messages: meshId → Map<scheduledId, entry>
interface ScheduledEntry {
id: string;
meshId: string;
presenceId: string;
to: string;
message: string;
deliverAt: number;
createdAt: number;
timer: ReturnType<typeof setTimeout>;
}
const scheduledMessages = new Map<string, ScheduledEntry>(); // keyed by scheduledId
const hookRateLimit = new TokenBucket(
env.HOOK_RATE_LIMIT_PER_MIN,
env.HOOK_RATE_LIMIT_PER_MIN,
@@ -1788,6 +1801,93 @@ function handleConnection(ws: WebSocket): void {
log.info("ws mesh_info", { presence_id: presenceId });
break;
}
// --- Scheduled messages ---
case "schedule": {
const sm = msg as Extract<WSClientMessage, { type: "schedule" }>;
const scheduledId = crypto.randomUUID();
const now = Date.now();
const delay = Math.max(0, sm.deliverAt - now);
const deliver = (): void => {
scheduledMessages.delete(scheduledId);
// Deliver via the normal send path by constructing a WSSendMessage
// and routing it through handleSend so encryption + push logic applies.
const conn2 = connections.get(presenceId);
if (!conn2) return; // session gone — drop
const fakeMsg: Extract<WSClientMessage, { type: "send" }> = {
type: "send",
id: crypto.randomUUID(),
targetSpec: sm.to,
priority: "now",
nonce: "",
ciphertext: Buffer.from(sm.message, "utf-8").toString("base64"),
};
handleSend(conn2, presenceId, fakeMsg).catch((e) =>
log.warn("scheduled delivery error", { scheduled_id: scheduledId, error: String(e) }),
);
log.info("ws schedule deliver", { scheduled_id: scheduledId, to: sm.to });
};
const entry: ScheduledEntry = {
id: scheduledId,
meshId: conn.meshId,
presenceId,
to: sm.to,
message: sm.message,
deliverAt: sm.deliverAt,
createdAt: now,
timer: setTimeout(deliver, delay),
};
scheduledMessages.set(scheduledId, entry);
sendToPeer(presenceId, {
type: "scheduled_ack",
scheduledId,
deliverAt: sm.deliverAt,
...(_reqId ? { _reqId } : {}),
});
log.info("ws schedule", {
presence_id: presenceId,
scheduled_id: scheduledId,
delay_ms: delay,
to: sm.to,
});
break;
}
case "list_scheduled": {
const mine = [...scheduledMessages.values()]
.filter((e) => e.meshId === conn.meshId && e.presenceId === presenceId)
.map((e) => ({ id: e.id, to: e.to, message: e.message, deliverAt: e.deliverAt, createdAt: e.createdAt }));
sendToPeer(presenceId, {
type: "scheduled_list",
messages: mine,
...(_reqId ? { _reqId } : {}),
});
log.info("ws list_scheduled", { presence_id: presenceId, count: mine.length });
break;
}
case "cancel_scheduled": {
const cs = msg as Extract<WSClientMessage, { type: "cancel_scheduled" }>;
const entry = scheduledMessages.get(cs.scheduledId);
let ok = false;
if (entry && entry.meshId === conn.meshId && entry.presenceId === presenceId) {
clearTimeout(entry.timer);
scheduledMessages.delete(cs.scheduledId);
ok = true;
}
sendToPeer(presenceId, {
type: "cancel_scheduled_ack",
scheduledId: cs.scheduledId,
ok,
...(_reqId ? { _reqId } : {}),
});
log.info("ws cancel_scheduled", { presence_id: presenceId, scheduled_id: cs.scheduledId, ok });
break;
}
}
} catch (e) {
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });

View File

@@ -664,6 +664,60 @@ export interface WSErrorMessage {
_reqId?: string;
}
// --- Scheduled messages ---
/** Client → broker: schedule a message for future delivery. */
export interface WSScheduleMessage {
type: "schedule";
to: string;
message: string;
/** Unix timestamp (ms) when to deliver. */
deliverAt: number;
_reqId?: string;
}
/** Client → broker: list pending scheduled messages for this member. */
export interface WSListScheduledMessage {
type: "list_scheduled";
_reqId?: string;
}
/** Client → broker: cancel a scheduled message by id. */
export interface WSCancelScheduledMessage {
type: "cancel_scheduled";
scheduledId: string;
_reqId?: string;
}
/** Broker → client: acknowledgement for schedule, carries the assigned id. */
export interface WSScheduledAckMessage {
type: "scheduled_ack";
scheduledId: string;
deliverAt: number;
_reqId?: string;
}
/** Broker → client: list of pending scheduled messages. */
export interface WSScheduledListMessage {
type: "scheduled_list";
messages: Array<{
id: string;
to: string;
message: string;
deliverAt: number;
createdAt: number;
}>;
_reqId?: string;
}
/** Broker → client: cancel confirmation. */
export interface WSCancelScheduledAckMessage {
type: "cancel_scheduled_ack";
scheduledId: string;
ok: boolean;
_reqId?: string;
}
export type WSClientMessage =
| WSHelloMessage
| WSSendMessage
@@ -705,7 +759,10 @@ export type WSClientMessage =
| WSSubscribeMessage
| WSUnsubscribeMessage
| WSListStreamsMessage
| WSMeshInfoMessage;
| WSMeshInfoMessage
| WSScheduleMessage
| WSListScheduledMessage
| WSCancelScheduledMessage;
export type WSServerMessage =
| WSHelloAckMessage
@@ -738,4 +795,7 @@ export type WSServerMessage =
| WSSubscribedMessage
| WSStreamListMessage
| WSMeshInfoResultMessage
| WSScheduledAckMessage
| WSScheduledListMessage
| WSCancelScheduledAckMessage
| WSErrorMessage;