diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 2e828de..90a30f3 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -396,6 +396,7 @@ export async function listPeersInMesh( summary: string | null; groups: Array<{ name: string; role?: string }>; sessionId: string; + cwd: string; connectedAt: Date; }> > { @@ -409,6 +410,7 @@ export async function listPeersInMesh( summary: presence.summary, groups: presence.groups, sessionId: presence.sessionId, + cwd: presence.cwd, connectedAt: presence.connectedAt, }) .from(presence) @@ -428,6 +430,7 @@ export async function listPeersInMesh( summary: r.summary, groups: (r.groups ?? []) as Array<{ name: string; role?: string }>, sessionId: r.sessionId, + cwd: r.cwd, connectedAt: r.connectedAt, })); } diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 80ed21f..03d3240 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -15,10 +15,10 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; -import { eq, sql } from "drizzle-orm"; +import { and, eq, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { messageQueue } from "@turbostarter/db/schema/mesh"; +import { messageQueue, scheduledMessage as scheduledMessageTable } from "@turbostarter/db/schema/mesh"; import { claimTask, completeTask, @@ -112,14 +112,142 @@ interface ScheduledEntry { id: string; meshId: string; presenceId: string; + memberId: string; to: string; message: string; deliverAt: number; createdAt: number; subtype?: "reminder"; + cron?: string; + recurring?: boolean; + firedCount: number; timer: ReturnType; } const scheduledMessages = new Map(); // keyed by scheduledId + +// --------------------------------------------------------------------------- +// Minimal 5-field cron parser (minute hour dom month dow) +// Supports: numbers, *, */N, N-M, comma-separated lists +// --------------------------------------------------------------------------- + +function parseCronField(field: string, min: number, max: number): number[] { + const results = new Set(); + for (const part of field.split(",")) { + const stepMatch = part.match(/^(\S+)\/(\d+)$/); + let range: string; + let step: number; + if (stepMatch) { + range = stepMatch[1]!; + step = parseInt(stepMatch[2]!, 10); + } else { + range = part; + step = 1; + } + + let start: number; + let end: number; + if (range === "*") { + start = min; + end = max; + } else if (range.includes("-")) { + const [a, b] = range.split("-"); + start = parseInt(a!, 10); + end = parseInt(b!, 10); + } else { + start = parseInt(range, 10); + end = start; + } + for (let i = start; i <= end; i += step) { + if (i >= min && i <= max) results.add(i); + } + } + return [...results].sort((a, b) => a - b); +} + +/** + * Given a 5-field cron expression and a reference Date, return the next + * fire time as a Date. Scans minute-by-minute from `after` up to 366 days + * ahead. Returns null if no match found (invalid expression). + */ +function cronNextFireTime(cronExpr: string, after: Date = new Date()): Date | null { + const fields = cronExpr.trim().split(/\s+/); + if (fields.length !== 5) return null; + + const minutes = parseCronField(fields[0]!, 0, 59); + const hours = parseCronField(fields[1]!, 0, 23); + const doms = parseCronField(fields[2]!, 1, 31); + const months = parseCronField(fields[3]!, 1, 12); + const dows = parseCronField(fields[4]!, 0, 6); // 0 = Sunday + + if (!minutes.length || !hours.length || !doms.length || !months.length || !dows.length) { + return null; + } + + // Start from the next minute after `after` + const candidate = new Date(after); + candidate.setSeconds(0, 0); + candidate.setMinutes(candidate.getMinutes() + 1); + + const limit = after.getTime() + 366 * 24 * 60 * 60 * 1000; + while (candidate.getTime() < limit) { + if ( + months.includes(candidate.getMonth() + 1) && + doms.includes(candidate.getDate()) && + dows.includes(candidate.getDay()) && + hours.includes(candidate.getHours()) && + minutes.includes(candidate.getMinutes()) + ) { + return candidate; + } + candidate.setMinutes(candidate.getMinutes() + 1); + } + return null; +} + +// --------------------------------------------------------------------------- +// Persist scheduled entry to DB +// --------------------------------------------------------------------------- + +async function persistScheduledEntry(entry: ScheduledEntry): Promise { + await db.insert(scheduledMessageTable).values({ + id: entry.id, + meshId: entry.meshId, + presenceId: entry.presenceId, + memberId: entry.memberId, + to: entry.to, + message: entry.message, + deliverAt: entry.deliverAt ? new Date(entry.deliverAt) : null, + cron: entry.cron ?? null, + subtype: entry.subtype ?? null, + firedCount: entry.firedCount, + cancelled: false, + }); +} + +async function markScheduledFired(id: string): Promise { + await db + .update(scheduledMessageTable) + .set({ firedAt: new Date(), firedCount: sql`${scheduledMessageTable.firedCount} + 1` }) + .where(eq(scheduledMessageTable.id, id)); +} + +async function markScheduledCancelled(id: string): Promise { + await db + .update(scheduledMessageTable) + .set({ cancelled: true }) + .where(eq(scheduledMessageTable.id, id)); +} + +async function updateScheduledNextFire(id: string, nextDeliverAt: Date, firedCount: number): Promise { + await db + .update(scheduledMessageTable) + .set({ + deliverAt: nextDeliverAt, + firedCount, + firedAt: new Date(), + }) + .where(eq(scheduledMessageTable.id, id)); +} const hookRateLimit = new TokenBucket( env.HOOK_RATE_LIMIT_PER_MIN, env.HOOK_RATE_LIMIT_PER_MIN, diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 40e1f85..1457171 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -717,6 +717,8 @@ export interface WSScheduledAckMessage { type: "scheduled_ack"; scheduledId: string; deliverAt: number; + /** Present for cron schedules — echoes the expression. */ + cron?: string; _reqId?: string; } @@ -729,6 +731,10 @@ export interface WSScheduledListMessage { message: string; deliverAt: number; createdAt: number; + /** Present for cron/recurring entries. */ + cron?: string; + /** Number of times the cron entry has fired so far. */ + firedCount?: number; }>; _reqId?: string; } diff --git a/apps/cli/src/commands/peers.ts b/apps/cli/src/commands/peers.ts index 2ade9a3..816c66c 100644 --- a/apps/cli/src/commands/peers.ts +++ b/apps/cli/src/commands/peers.ts @@ -40,8 +40,15 @@ export async function runPeers(flags: PeersFlags): Promise { : ""; const statusIcon = p.status === "working" ? yellow("●") : green("●"); const name = bold(p.displayName); + const meta: string[] = []; + if (p.peerType) meta.push(p.peerType); + if (p.channel) meta.push(p.channel); + if (p.model) meta.push(p.model); + const metaStr = meta.length ? dim(` (${meta.join(", ")})`) : ""; + const cwdStr = p.cwd ? dim(` cwd: ${p.cwd}`) : ""; const summary = p.summary ? dim(` ${p.summary}`) : ""; - console.log(` ${statusIcon} ${name}${groups}${summary}`); + console.log(` ${statusIcon} ${name}${groups}${metaStr}${summary}`); + if (cwdStr) console.log(` ${cwdStr}`); } console.log(""); }); diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index a7b060b..3c7c962 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -322,7 +322,13 @@ Your message mode is "${messageMode}". const peerLines = peers.map((p) => { const summary = p.summary ? ` — "${p.summary}"` : ""; const groupsStr = p.groups?.length ? ` [${p.groups.map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ')}]` : ""; - return `- **${p.displayName}** [${p.status}]${groupsStr} (${p.pubkey.slice(0, 12)}…)${summary}`; + const meta: string[] = []; + if (p.peerType) meta.push(`type:${p.peerType}`); + if (p.channel) meta.push(`channel:${p.channel}`); + if (p.model) meta.push(`model:${p.model}`); + const metaStr = meta.length ? ` {${meta.join(", ")}}` : ""; + const cwdStr = p.cwd ? ` cwd:${p.cwd}` : ""; + return `- **${p.displayName}** [${p.status}]${groupsStr}${metaStr} (${p.pubkey.slice(0, 12)}…)${cwdStr}${summary}`; }); sections.push(`${header}\n${peerLines.join("\n")}`); } diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index f0e82a0..18eebaf 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -34,6 +34,10 @@ export interface PeerInfo { groups: Array<{ name: string; role?: string }>; sessionId: string; connectedAt: string; + cwd?: string; + peerType?: "ai" | "human" | "connector"; + channel?: string; + model?: string; } export interface InboundPush { @@ -162,6 +166,9 @@ export class BrokerClient { sessionId: `${process.pid}-${Date.now()}`, pid: process.pid, cwd: process.cwd(), + peerType: "ai" as const, + channel: "claude-code", + model: process.env.CLAUDE_MODEL || undefined, timestamp, signature, }),