diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 2704e3e..f308320 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -103,7 +103,7 @@ const connectionsPerMesh = new Map(); // Stream subscriptions: "meshId:streamName" → Set of presenceIds const streamSubscriptions = new Map>(); -// Scheduled messages: meshId → Map +/// Scheduled messages: meshId → Map interface ScheduledEntry { id: string; meshId: string; @@ -112,6 +112,7 @@ interface ScheduledEntry { message: string; deliverAt: number; createdAt: number; + subtype?: "reminder"; timer: ReturnType; } const scheduledMessages = new Map(); // keyed by scheduledId @@ -652,6 +653,7 @@ async function handleHello( async function handleSend( conn: PeerConn, msg: Extract, + subtype?: "reminder", ): Promise { const messageId = await queueMessage({ meshId: conn.meshId, @@ -696,6 +698,7 @@ async function handleSend( nonce: msg.nonce, ciphertext: msg.ciphertext, createdAt: new Date().toISOString(), + ...(subtype ? { subtype } : {}), }; for (const [pid, peer] of connections) { @@ -1824,7 +1827,7 @@ function handleConnection(ws: WebSocket): void { nonce: "", ciphertext: Buffer.from(sm.message, "utf-8").toString("base64"), }; - handleSend(conn2, presenceId, fakeMsg).catch((e) => + handleSend(conn2, fakeMsg, sm.subtype).catch((e) => log.warn("scheduled delivery error", { scheduled_id: scheduledId, error: String(e) }), ); log.info("ws schedule deliver", { scheduled_id: scheduledId, to: sm.to }); @@ -1838,6 +1841,7 @@ function handleConnection(ws: WebSocket): void { message: sm.message, deliverAt: sm.deliverAt, createdAt: now, + ...(sm.subtype ? { subtype: sm.subtype } : {}), timer: setTimeout(deliver, delay), }; scheduledMessages.set(scheduledId, entry); diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 5944024..2c08e2e 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -86,6 +86,8 @@ export interface WSPushMessage { nonce: string; ciphertext: string; createdAt: string; + /** Optional semantic tag — "reminder" when delivered by the scheduler. */ + subtype?: "reminder"; } /** Client → broker: manual status override (dnd, forced idle). */ @@ -673,6 +675,8 @@ export interface WSScheduleMessage { message: string; /** Unix timestamp (ms) when to deliver. */ deliverAt: number; + /** Optional semantic tag — "reminder" surfaces differently to the receiver. */ + subtype?: "reminder"; _reqId?: string; } diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index c63c1b2..c4fadba 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -123,7 +123,8 @@ function decryptFailedWarning(senderPubkey: string): string { function formatPush(p: InboundPush, meshSlug: string): string { const body = p.plaintext ?? decryptFailedWarning(p.senderPubkey); - return `[${meshSlug}] from ${p.senderPubkey.slice(0, 12)}… (${p.priority}, ${p.createdAt}):\n${body}`; + const tag = p.subtype === "reminder" ? " [REMINDER]" : ""; + return `[${meshSlug}]${tag} from ${p.senderPubkey.slice(0, 12)}… (${p.priority}, ${p.createdAt}):\n${body}`; } export async function startMcpServer(): Promise { @@ -147,6 +148,8 @@ You are "${myName}"${myRole ? ` (${myRole})` : ""} — a peer in the claudemesh ## Responding to messages When you receive a message, RESPOND IMMEDIATELY. Pause your current task, reply via send_message, then resume. Read from_name, mesh_slug, and priority from the channel attributes. Reply by setting \`to\` to the sender's from_name (display name). Stay in character per your system prompt. Do not ignore low-priority messages — acknowledge them briefly even if you defer action. +If the channel meta contains \`subtype: reminder\`, this is a scheduled reminder you set for yourself — act on it immediately (no reply needed). + ## Tools | Tool | Description | |------|-------------| @@ -188,6 +191,9 @@ When you receive a message, RESPOND IMMEDIATEL | claim_task(id) | Claim an unclaimed task. | | complete_task(id, result?) | Mark task done with optional result. | | list_tasks(status?, assignee?) | List tasks filtered by status/assignee. | +| schedule_reminder(message, in_seconds?, deliver_at?, to?) | Schedule a reminder to yourself (no \`to\`) or a delayed message to a peer/group. Delivered as a push with \`subtype: reminder\` in the channel meta. | +| list_scheduled() | List pending scheduled reminders and messages. | +| cancel_scheduled(id) | Cancel a pending scheduled item. | If multiple meshes are joined, prefix \`to\` with \`:\` to disambiguate (e.g. \`dev-team:Alice\`). @@ -445,17 +451,14 @@ Your message mode is "${messageMode}". } // --- Scheduled messages --- - case "schedule_reminder": - case "send_later": { + case "schedule_reminder": { const sArgs = (args ?? {}) as { message?: string; to?: string; deliver_at?: number; in_seconds?: number; }; - if (!sArgs.message) return text(`${name}: \`message\` required`, true); - const to = name === "schedule_reminder" ? "self" : (sArgs.to ?? ""); - if (name === "send_later" && !to) return text("send_later: `to` required", true); + if (!sArgs.message) return text("schedule_reminder: `message` required", true); let deliverAt: number; if (sArgs.deliver_at) { @@ -463,32 +466,37 @@ Your message mode is "${messageMode}". } else if (sArgs.in_seconds) { deliverAt = Date.now() + Number(sArgs.in_seconds) * 1_000; } else { - return text(`${name}: provide \`deliver_at\` (ms timestamp) or \`in_seconds\``, true); + return text("schedule_reminder: provide `deliver_at` (ms timestamp) or `in_seconds`", true); } - // For send_later, resolve display name → pubkey if needed - let targetSpec = to; - if (name === "send_later" && !to.startsWith("@") && to !== "*" && !/^[0-9a-f]{64}$/i.test(to) && to !== "self") { - const peers = await client.listPeers(); - const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase()); - if (!match) { - const names = peers.map((p) => p.displayName).join(", "); - return text(`send_later: peer "${to}" not found. Online: ${names || "(none)"}`, true); - } - targetSpec = match.pubkey; - } - if (name === "schedule_reminder") { - // Self-reminder: use own session pubkey + const isSelf = !sArgs.to; + let targetSpec: string; + if (isSelf) { + // Self-reminder: target own session pubkey targetSpec = client.getSessionPubkey() ?? "*"; + } else { + const to = sArgs.to!; + // Resolve display name → pubkey if not a raw spec + if (!to.startsWith("@") && to !== "*" && !/^[0-9a-f]{64}$/i.test(to)) { + const peers = await client.listPeers(); + const match = peers.find((p) => p.displayName.toLowerCase() === to.toLowerCase()); + if (!match) { + const names = peers.map((p) => p.displayName).join(", "); + return text(`schedule_reminder: peer "${to}" not found. Online: ${names || "(none)"}`, true); + } + targetSpec = match.pubkey; + } else { + targetSpec = to; + } } - const result = await client.scheduleMessage(targetSpec, sArgs.message, deliverAt); - if (!result) return text(`${name}: broker did not acknowledge — check connection`, true); + const result = await client.scheduleMessage(targetSpec, sArgs.message, deliverAt, true); + if (!result) return text("schedule_reminder: broker did not acknowledge — check connection", true); const when = new Date(result.deliverAt).toISOString(); return text( - name === "schedule_reminder" - ? `Reminder scheduled (${result.scheduledId.slice(0, 8)}): "${sArgs.message.slice(0, 60)}" at ${when}` - : `Message to "${to}" scheduled (${result.scheduledId.slice(0, 8)}) for ${when}`, + isSelf + ? `Self-reminder scheduled (${result.scheduledId.slice(0, 8)}): "${sArgs.message.slice(0, 60)}" at ${when}` + : `Reminder to "${sArgs.to}" scheduled (${result.scheduledId.slice(0, 8)}) for ${when}`, ); } case "list_scheduled": { @@ -1016,6 +1024,7 @@ Your message mode is "${messageMode}". sent_at: msg.createdAt, delivered_at: msg.receivedAt, kind: msg.kind, + ...(msg.subtype ? { subtype: msg.subtype } : {}), }, }, }); diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index 720b071..56f0f47 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -568,32 +568,21 @@ export const TOOLS: Tool[] = [ { name: "schedule_reminder", description: - "Schedule a reminder message delivered back to yourself at a future time. The broker fires it as a push when the time arrives. Use to prompt yourself to check on something later.", + "Schedule a message for future delivery. Without `to`, it fires back to yourself (a self-reminder). With `to`, it delivers to a peer, @group, or * broadcast. The broker holds it and delivers when the time arrives. Receivers see `subtype: reminder` in the push envelope.", inputSchema: { type: "object", properties: { - message: { type: "string", description: "Reminder text" }, + message: { type: "string", description: "Message or reminder text" }, deliver_at: { type: "number", description: "Unix timestamp (ms) when to deliver" }, in_seconds: { type: "number", description: "Alternative to deliver_at: fire after N seconds" }, + to: { + type: "string", + description: "Recipient: display name, pubkey hex, @group, or * (omit for self-reminder)", + }, }, required: ["message"], }, }, - { - name: "send_later", - description: - "Send a message to a peer, @group, or broadcast (*) at a future time. The broker holds it and delivers when the time arrives.", - inputSchema: { - type: "object", - properties: { - to: { type: "string", description: "Recipient: display name, pubkey hex, @group, or *" }, - message: { type: "string", description: "Message text" }, - deliver_at: { type: "number", description: "Unix timestamp (ms) when to deliver" }, - in_seconds: { type: "number", description: "Alternative to deliver_at: fire after N seconds" }, - }, - required: ["to", "message"], - }, - }, { name: "list_scheduled", description: "List all your pending scheduled messages: id, recipient, preview, and delivery time.", diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 2a57d58..18504e5 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -51,6 +51,8 @@ export interface InboundPush { /** Hint for UI: "direct" (crypto_box), "channel"/"broadcast" * (plaintext for now). */ kind: "direct" | "broadcast" | "channel" | "unknown"; + /** Optional semantic tag — "reminder" when fired by the scheduler. */ + subtype?: "reminder"; } type PushHandler = (msg: InboundPush) => void; @@ -406,6 +408,7 @@ export class BrokerClient { to: string, message: string, deliverAt: number, + isReminder = false, ): Promise<{ scheduledId: string; deliverAt: number } | null> { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; return new Promise((resolve) => { @@ -413,7 +416,14 @@ export class BrokerClient { this.scheduledAckResolvers.set(reqId, { resolve, timer: setTimeout(() => { if (this.scheduledAckResolvers.delete(reqId)) resolve(null); }, 8_000) }); - this.ws!.send(JSON.stringify({ type: "schedule", to, message, deliverAt, _reqId: reqId })); + this.ws!.send(JSON.stringify({ + type: "schedule", + to, + message, + deliverAt, + ...(isReminder ? { subtype: "reminder" } : {}), + _reqId: reqId, + })); }); } @@ -927,6 +937,7 @@ export class BrokerClient { receivedAt: new Date().toISOString(), plaintext, kind, + ...(msg.subtype ? { subtype: msg.subtype as "reminder" } : {}), }; this.pushBuffer.push(push); if (this.pushBuffer.length > 500) this.pushBuffer.shift();