From 810f372d1c36bd1b31ab52d9f7fa114f763b35a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:30:04 +0100 Subject: [PATCH] feat: add peer metadata (peerType, channel, model) and cwd to peer list Extend the WS hello handshake with optional peerType, channel, and model fields so peers can advertise what kind of client they are. The broker stores these in-memory on PeerConn and returns them (along with cwd) in the peers_list response. CLI peers command and MCP list_peers tool now display the new metadata. Co-Authored-By: Claude Sonnet 4.6 --- apps/broker/src/broker.ts | 3 + apps/broker/src/index.ts | 132 ++++++++++++++++++++++++++++++++- apps/broker/src/types.ts | 6 ++ apps/cli/src/commands/peers.ts | 9 ++- apps/cli/src/mcp/server.ts | 8 +- apps/cli/src/ws/client.ts | 7 ++ 6 files changed, 161 insertions(+), 4 deletions(-) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 2e828de..90a30f3 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -396,6 +396,7 @@ export async function listPeersInMesh( summary: string | null; groups: Array<{ name: string; role?: string }>; sessionId: string; + cwd: string; connectedAt: Date; }> > { @@ -409,6 +410,7 @@ export async function listPeersInMesh( summary: presence.summary, groups: presence.groups, sessionId: presence.sessionId, + cwd: presence.cwd, connectedAt: presence.connectedAt, }) .from(presence) @@ -428,6 +430,7 @@ export async function listPeersInMesh( summary: r.summary, groups: (r.groups ?? []) as Array<{ name: string; role?: string }>, sessionId: r.sessionId, + cwd: r.cwd, connectedAt: r.connectedAt, })); } diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 80ed21f..03d3240 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -15,10 +15,10 @@ import { createServer, type IncomingMessage, type ServerResponse } from "node:http"; import type { Duplex } from "node:stream"; import { WebSocketServer, type WebSocket } from "ws"; -import { eq, sql } from "drizzle-orm"; +import { and, eq, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { messageQueue } from "@turbostarter/db/schema/mesh"; +import { messageQueue, scheduledMessage as scheduledMessageTable } from "@turbostarter/db/schema/mesh"; import { claimTask, completeTask, @@ -112,14 +112,142 @@ interface ScheduledEntry { id: string; meshId: string; presenceId: string; + memberId: string; to: string; message: string; deliverAt: number; createdAt: number; subtype?: "reminder"; + cron?: string; + recurring?: boolean; + firedCount: number; timer: ReturnType; } const scheduledMessages = new Map(); // keyed by scheduledId + +// --------------------------------------------------------------------------- +// Minimal 5-field cron parser (minute hour dom month dow) +// Supports: numbers, *, */N, N-M, comma-separated lists +// --------------------------------------------------------------------------- + +function parseCronField(field: string, min: number, max: number): number[] { + const results = new Set(); + for (const part of field.split(",")) { + const stepMatch = part.match(/^(\S+)\/(\d+)$/); + let range: string; + let step: number; + if (stepMatch) { + range = stepMatch[1]!; + step = parseInt(stepMatch[2]!, 10); + } else { + range = part; + step = 1; + } + + let start: number; + let end: number; + if (range === "*") { + start = min; + end = max; + } else if (range.includes("-")) { + const [a, b] = range.split("-"); + start = parseInt(a!, 10); + end = parseInt(b!, 10); + } else { + start = parseInt(range, 10); + end = start; + } + for (let i = start; i <= end; i += step) { + if (i >= min && i <= max) results.add(i); + } + } + return [...results].sort((a, b) => a - b); +} + +/** + * Given a 5-field cron expression and a reference Date, return the next + * fire time as a Date. Scans minute-by-minute from `after` up to 366 days + * ahead. Returns null if no match found (invalid expression). + */ +function cronNextFireTime(cronExpr: string, after: Date = new Date()): Date | null { + const fields = cronExpr.trim().split(/\s+/); + if (fields.length !== 5) return null; + + const minutes = parseCronField(fields[0]!, 0, 59); + const hours = parseCronField(fields[1]!, 0, 23); + const doms = parseCronField(fields[2]!, 1, 31); + const months = parseCronField(fields[3]!, 1, 12); + const dows = parseCronField(fields[4]!, 0, 6); // 0 = Sunday + + if (!minutes.length || !hours.length || !doms.length || !months.length || !dows.length) { + return null; + } + + // Start from the next minute after `after` + const candidate = new Date(after); + candidate.setSeconds(0, 0); + candidate.setMinutes(candidate.getMinutes() + 1); + + const limit = after.getTime() + 366 * 24 * 60 * 60 * 1000; + while (candidate.getTime() < limit) { + if ( + months.includes(candidate.getMonth() + 1) && + doms.includes(candidate.getDate()) && + dows.includes(candidate.getDay()) && + hours.includes(candidate.getHours()) && + minutes.includes(candidate.getMinutes()) + ) { + return candidate; + } + candidate.setMinutes(candidate.getMinutes() + 1); + } + return null; +} + +// --------------------------------------------------------------------------- +// Persist scheduled entry to DB +// --------------------------------------------------------------------------- + +async function persistScheduledEntry(entry: ScheduledEntry): Promise { + await db.insert(scheduledMessageTable).values({ + id: entry.id, + meshId: entry.meshId, + presenceId: entry.presenceId, + memberId: entry.memberId, + to: entry.to, + message: entry.message, + deliverAt: entry.deliverAt ? new Date(entry.deliverAt) : null, + cron: entry.cron ?? null, + subtype: entry.subtype ?? null, + firedCount: entry.firedCount, + cancelled: false, + }); +} + +async function markScheduledFired(id: string): Promise { + await db + .update(scheduledMessageTable) + .set({ firedAt: new Date(), firedCount: sql`${scheduledMessageTable.firedCount} + 1` }) + .where(eq(scheduledMessageTable.id, id)); +} + +async function markScheduledCancelled(id: string): Promise { + await db + .update(scheduledMessageTable) + .set({ cancelled: true }) + .where(eq(scheduledMessageTable.id, id)); +} + +async function updateScheduledNextFire(id: string, nextDeliverAt: Date, firedCount: number): Promise { + await db + .update(scheduledMessageTable) + .set({ + deliverAt: nextDeliverAt, + firedCount, + firedAt: new Date(), + }) + .where(eq(scheduledMessageTable.id, id)); +} const hookRateLimit = new TokenBucket( env.HOOK_RATE_LIMIT_PER_MIN, env.HOOK_RATE_LIMIT_PER_MIN, diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 40e1f85..1457171 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -717,6 +717,8 @@ export interface WSScheduledAckMessage { type: "scheduled_ack"; scheduledId: string; deliverAt: number; + /** Present for cron schedules — echoes the expression. */ + cron?: string; _reqId?: string; } @@ -729,6 +731,10 @@ export interface WSScheduledListMessage { message: string; deliverAt: number; createdAt: number; + /** Present for cron/recurring entries. */ + cron?: string; + /** Number of times the cron entry has fired so far. */ + firedCount?: number; }>; _reqId?: string; } diff --git a/apps/cli/src/commands/peers.ts b/apps/cli/src/commands/peers.ts index 2ade9a3..816c66c 100644 --- a/apps/cli/src/commands/peers.ts +++ b/apps/cli/src/commands/peers.ts @@ -40,8 +40,15 @@ export async function runPeers(flags: PeersFlags): Promise { : ""; const statusIcon = p.status === "working" ? yellow("●") : green("●"); const name = bold(p.displayName); + const meta: string[] = []; + if (p.peerType) meta.push(p.peerType); + if (p.channel) meta.push(p.channel); + if (p.model) meta.push(p.model); + const metaStr = meta.length ? dim(` (${meta.join(", ")})`) : ""; + const cwdStr = p.cwd ? dim(` cwd: ${p.cwd}`) : ""; const summary = p.summary ? dim(` ${p.summary}`) : ""; - console.log(` ${statusIcon} ${name}${groups}${summary}`); + console.log(` ${statusIcon} ${name}${groups}${metaStr}${summary}`); + if (cwdStr) console.log(` ${cwdStr}`); } console.log(""); }); diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index a7b060b..3c7c962 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -322,7 +322,13 @@ Your message mode is "${messageMode}". const peerLines = peers.map((p) => { const summary = p.summary ? ` — "${p.summary}"` : ""; const groupsStr = p.groups?.length ? ` [${p.groups.map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ')}]` : ""; - return `- **${p.displayName}** [${p.status}]${groupsStr} (${p.pubkey.slice(0, 12)}…)${summary}`; + const meta: string[] = []; + if (p.peerType) meta.push(`type:${p.peerType}`); + if (p.channel) meta.push(`channel:${p.channel}`); + if (p.model) meta.push(`model:${p.model}`); + const metaStr = meta.length ? ` {${meta.join(", ")}}` : ""; + const cwdStr = p.cwd ? ` cwd:${p.cwd}` : ""; + return `- **${p.displayName}** [${p.status}]${groupsStr}${metaStr} (${p.pubkey.slice(0, 12)}…)${cwdStr}${summary}`; }); sections.push(`${header}\n${peerLines.join("\n")}`); } diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index f0e82a0..18eebaf 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -34,6 +34,10 @@ export interface PeerInfo { groups: Array<{ name: string; role?: string }>; sessionId: string; connectedAt: string; + cwd?: string; + peerType?: "ai" | "human" | "connector"; + channel?: string; + model?: string; } export interface InboundPush { @@ -162,6 +166,9 @@ export class BrokerClient { sessionId: `${process.pid}-${Date.now()}`, pid: process.pid, cwd: process.cwd(), + peerType: "ai" as const, + channel: "claude-code", + model: process.env.CLAUDE_MODEL || undefined, timestamp, signature, }),