feat: add peer metadata (peerType, channel, model) and cwd to peer list
Extend the WS hello handshake with optional peerType, channel, and model fields so peers can advertise what kind of client they are. The broker stores these in-memory on PeerConn and returns them (along with cwd) in the peers_list response. CLI peers command and MCP list_peers tool now display the new metadata. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -396,6 +396,7 @@ export async function listPeersInMesh(
|
||||
summary: string | null;
|
||||
groups: Array<{ name: string; role?: string }>;
|
||||
sessionId: string;
|
||||
cwd: string;
|
||||
connectedAt: Date;
|
||||
}>
|
||||
> {
|
||||
@@ -409,6 +410,7 @@ export async function listPeersInMesh(
|
||||
summary: presence.summary,
|
||||
groups: presence.groups,
|
||||
sessionId: presence.sessionId,
|
||||
cwd: presence.cwd,
|
||||
connectedAt: presence.connectedAt,
|
||||
})
|
||||
.from(presence)
|
||||
@@ -428,6 +430,7 @@ export async function listPeersInMesh(
|
||||
summary: r.summary,
|
||||
groups: (r.groups ?? []) as Array<{ name: string; role?: string }>,
|
||||
sessionId: r.sessionId,
|
||||
cwd: r.cwd,
|
||||
connectedAt: r.connectedAt,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -15,10 +15,10 @@
|
||||
import { createServer, type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import type { Duplex } from "node:stream";
|
||||
import { WebSocketServer, type WebSocket } from "ws";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { and, eq, sql } from "drizzle-orm";
|
||||
import { env } from "./env";
|
||||
import { db } from "./db";
|
||||
import { messageQueue } from "@turbostarter/db/schema/mesh";
|
||||
import { messageQueue, scheduledMessage as scheduledMessageTable } from "@turbostarter/db/schema/mesh";
|
||||
import {
|
||||
claimTask,
|
||||
completeTask,
|
||||
@@ -112,14 +112,142 @@ interface ScheduledEntry {
|
||||
id: string;
|
||||
meshId: string;
|
||||
presenceId: string;
|
||||
memberId: string;
|
||||
to: string;
|
||||
message: string;
|
||||
deliverAt: number;
|
||||
createdAt: number;
|
||||
subtype?: "reminder";
|
||||
cron?: string;
|
||||
recurring?: boolean;
|
||||
firedCount: number;
|
||||
timer: ReturnType<typeof setTimeout>;
|
||||
}
|
||||
const scheduledMessages = new Map<string, ScheduledEntry>(); // keyed by scheduledId
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Minimal 5-field cron parser (minute hour dom month dow)
|
||||
// Supports: numbers, *, */N, N-M, comma-separated lists
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function parseCronField(field: string, min: number, max: number): number[] {
|
||||
const results = new Set<number>();
|
||||
for (const part of field.split(",")) {
|
||||
const stepMatch = part.match(/^(\S+)\/(\d+)$/);
|
||||
let range: string;
|
||||
let step: number;
|
||||
if (stepMatch) {
|
||||
range = stepMatch[1]!;
|
||||
step = parseInt(stepMatch[2]!, 10);
|
||||
} else {
|
||||
range = part;
|
||||
step = 1;
|
||||
}
|
||||
|
||||
let start: number;
|
||||
let end: number;
|
||||
if (range === "*") {
|
||||
start = min;
|
||||
end = max;
|
||||
} else if (range.includes("-")) {
|
||||
const [a, b] = range.split("-");
|
||||
start = parseInt(a!, 10);
|
||||
end = parseInt(b!, 10);
|
||||
} else {
|
||||
start = parseInt(range, 10);
|
||||
end = start;
|
||||
}
|
||||
for (let i = start; i <= end; i += step) {
|
||||
if (i >= min && i <= max) results.add(i);
|
||||
}
|
||||
}
|
||||
return [...results].sort((a, b) => a - b);
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a 5-field cron expression and a reference Date, return the next
|
||||
* fire time as a Date. Scans minute-by-minute from `after` up to 366 days
|
||||
* ahead. Returns null if no match found (invalid expression).
|
||||
*/
|
||||
function cronNextFireTime(cronExpr: string, after: Date = new Date()): Date | null {
|
||||
const fields = cronExpr.trim().split(/\s+/);
|
||||
if (fields.length !== 5) return null;
|
||||
|
||||
const minutes = parseCronField(fields[0]!, 0, 59);
|
||||
const hours = parseCronField(fields[1]!, 0, 23);
|
||||
const doms = parseCronField(fields[2]!, 1, 31);
|
||||
const months = parseCronField(fields[3]!, 1, 12);
|
||||
const dows = parseCronField(fields[4]!, 0, 6); // 0 = Sunday
|
||||
|
||||
if (!minutes.length || !hours.length || !doms.length || !months.length || !dows.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Start from the next minute after `after`
|
||||
const candidate = new Date(after);
|
||||
candidate.setSeconds(0, 0);
|
||||
candidate.setMinutes(candidate.getMinutes() + 1);
|
||||
|
||||
const limit = after.getTime() + 366 * 24 * 60 * 60 * 1000;
|
||||
while (candidate.getTime() < limit) {
|
||||
if (
|
||||
months.includes(candidate.getMonth() + 1) &&
|
||||
doms.includes(candidate.getDate()) &&
|
||||
dows.includes(candidate.getDay()) &&
|
||||
hours.includes(candidate.getHours()) &&
|
||||
minutes.includes(candidate.getMinutes())
|
||||
) {
|
||||
return candidate;
|
||||
}
|
||||
candidate.setMinutes(candidate.getMinutes() + 1);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Persist scheduled entry to DB
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function persistScheduledEntry(entry: ScheduledEntry): Promise<void> {
|
||||
await db.insert(scheduledMessageTable).values({
|
||||
id: entry.id,
|
||||
meshId: entry.meshId,
|
||||
presenceId: entry.presenceId,
|
||||
memberId: entry.memberId,
|
||||
to: entry.to,
|
||||
message: entry.message,
|
||||
deliverAt: entry.deliverAt ? new Date(entry.deliverAt) : null,
|
||||
cron: entry.cron ?? null,
|
||||
subtype: entry.subtype ?? null,
|
||||
firedCount: entry.firedCount,
|
||||
cancelled: false,
|
||||
});
|
||||
}
|
||||
|
||||
async function markScheduledFired(id: string): Promise<void> {
|
||||
await db
|
||||
.update(scheduledMessageTable)
|
||||
.set({ firedAt: new Date(), firedCount: sql`${scheduledMessageTable.firedCount} + 1` })
|
||||
.where(eq(scheduledMessageTable.id, id));
|
||||
}
|
||||
|
||||
async function markScheduledCancelled(id: string): Promise<void> {
|
||||
await db
|
||||
.update(scheduledMessageTable)
|
||||
.set({ cancelled: true })
|
||||
.where(eq(scheduledMessageTable.id, id));
|
||||
}
|
||||
|
||||
async function updateScheduledNextFire(id: string, nextDeliverAt: Date, firedCount: number): Promise<void> {
|
||||
await db
|
||||
.update(scheduledMessageTable)
|
||||
.set({
|
||||
deliverAt: nextDeliverAt,
|
||||
firedCount,
|
||||
firedAt: new Date(),
|
||||
})
|
||||
.where(eq(scheduledMessageTable.id, id));
|
||||
}
|
||||
const hookRateLimit = new TokenBucket(
|
||||
env.HOOK_RATE_LIMIT_PER_MIN,
|
||||
env.HOOK_RATE_LIMIT_PER_MIN,
|
||||
|
||||
@@ -717,6 +717,8 @@ export interface WSScheduledAckMessage {
|
||||
type: "scheduled_ack";
|
||||
scheduledId: string;
|
||||
deliverAt: number;
|
||||
/** Present for cron schedules — echoes the expression. */
|
||||
cron?: string;
|
||||
_reqId?: string;
|
||||
}
|
||||
|
||||
@@ -729,6 +731,10 @@ export interface WSScheduledListMessage {
|
||||
message: string;
|
||||
deliverAt: number;
|
||||
createdAt: number;
|
||||
/** Present for cron/recurring entries. */
|
||||
cron?: string;
|
||||
/** Number of times the cron entry has fired so far. */
|
||||
firedCount?: number;
|
||||
}>;
|
||||
_reqId?: string;
|
||||
}
|
||||
|
||||
@@ -40,8 +40,15 @@ export async function runPeers(flags: PeersFlags): Promise<void> {
|
||||
: "";
|
||||
const statusIcon = p.status === "working" ? yellow("●") : green("●");
|
||||
const name = bold(p.displayName);
|
||||
const meta: string[] = [];
|
||||
if (p.peerType) meta.push(p.peerType);
|
||||
if (p.channel) meta.push(p.channel);
|
||||
if (p.model) meta.push(p.model);
|
||||
const metaStr = meta.length ? dim(` (${meta.join(", ")})`) : "";
|
||||
const cwdStr = p.cwd ? dim(` cwd: ${p.cwd}`) : "";
|
||||
const summary = p.summary ? dim(` ${p.summary}`) : "";
|
||||
console.log(` ${statusIcon} ${name}${groups}${summary}`);
|
||||
console.log(` ${statusIcon} ${name}${groups}${metaStr}${summary}`);
|
||||
if (cwdStr) console.log(` ${cwdStr}`);
|
||||
}
|
||||
console.log("");
|
||||
});
|
||||
|
||||
@@ -322,7 +322,13 @@ Your message mode is "${messageMode}".
|
||||
const peerLines = peers.map((p) => {
|
||||
const summary = p.summary ? ` — "${p.summary}"` : "";
|
||||
const groupsStr = p.groups?.length ? ` [${p.groups.map(g => `@${g.name}${g.role ? ':' + g.role : ''}`).join(', ')}]` : "";
|
||||
return `- **${p.displayName}** [${p.status}]${groupsStr} (${p.pubkey.slice(0, 12)}…)${summary}`;
|
||||
const meta: string[] = [];
|
||||
if (p.peerType) meta.push(`type:${p.peerType}`);
|
||||
if (p.channel) meta.push(`channel:${p.channel}`);
|
||||
if (p.model) meta.push(`model:${p.model}`);
|
||||
const metaStr = meta.length ? ` {${meta.join(", ")}}` : "";
|
||||
const cwdStr = p.cwd ? ` cwd:${p.cwd}` : "";
|
||||
return `- **${p.displayName}** [${p.status}]${groupsStr}${metaStr} (${p.pubkey.slice(0, 12)}…)${cwdStr}${summary}`;
|
||||
});
|
||||
sections.push(`${header}\n${peerLines.join("\n")}`);
|
||||
}
|
||||
|
||||
@@ -34,6 +34,10 @@ export interface PeerInfo {
|
||||
groups: Array<{ name: string; role?: string }>;
|
||||
sessionId: string;
|
||||
connectedAt: string;
|
||||
cwd?: string;
|
||||
peerType?: "ai" | "human" | "connector";
|
||||
channel?: string;
|
||||
model?: string;
|
||||
}
|
||||
|
||||
export interface InboundPush {
|
||||
@@ -162,6 +166,9 @@ export class BrokerClient {
|
||||
sessionId: `${process.pid}-${Date.now()}`,
|
||||
pid: process.pid,
|
||||
cwd: process.cwd(),
|
||||
peerType: "ai" as const,
|
||||
channel: "claude-code",
|
||||
model: process.env.CLAUDE_MODEL || undefined,
|
||||
timestamp,
|
||||
signature,
|
||||
}),
|
||||
|
||||
Reference in New Issue
Block a user