diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 3873667..c28fc67 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -137,6 +137,110 @@ interface PeerConn { const connections = new Map(); const connectionsPerMesh = new Map(); +// --- URL Watch engine --- +interface WatchEntry { + id: string; + meshId: string; + presenceId: string; + url: string; + mode: "hash" | "json" | "status"; + extract?: string; + notifyOn: string; + interval: number; + headers: Record; + label?: string; + lastHash: string; + lastValue: string; + lastCheck: Date | null; + createdAt: Date; + timer: ReturnType; +} + +const urlWatches = new Map(); + +async function checkWatch(watch: WatchEntry): Promise { + try { + const res = await fetch(watch.url, { + headers: watch.headers, + signal: AbortSignal.timeout(10_000), + }); + + let currentValue: string; + if (watch.mode === "status") { + currentValue = String(res.status); + } else { + const body = await res.text(); + if (watch.mode === "json" && watch.extract) { + try { + const json = JSON.parse(body); + // Simple dot-path extraction ($.status → json.status) + const path = watch.extract.replace(/^\$\.?/, "").split("."); + let val: unknown = json; + for (const p of path) { val = (val as Record)?.[p]; } + currentValue = String(val ?? "null"); + } catch { currentValue = body.slice(0, 200); } + } else { + // Hash mode — SHA-256 of full body + const { createHash } = await import("node:crypto"); + currentValue = createHash("sha256").update(body).digest("hex").slice(0, 16); + } + } + + watch.lastCheck = new Date(); + const oldValue = watch.lastValue; + + if (oldValue === "") { + // First check — just store baseline + watch.lastHash = currentValue; + watch.lastValue = currentValue; + return; + } + + // Check if notification should fire + let shouldNotify = false; + const notifyOn = watch.notifyOn; + if (notifyOn === "change") { + shouldNotify = currentValue !== oldValue; + } else if (notifyOn.startsWith("match:")) { + const target = notifyOn.slice(6); + shouldNotify = currentValue === target && oldValue !== target; + } else if (notifyOn.startsWith("not_match:")) { + const target = notifyOn.slice(10); + shouldNotify = currentValue !== target && oldValue === target; + } + + watch.lastHash = currentValue; + watch.lastValue = currentValue; + + if (shouldNotify) { + const notification: WSPushMessage = { + type: "push", + subtype: "system" as const, + event: "watch_triggered", + eventData: { + watchId: watch.id, + url: watch.url, + label: watch.label, + mode: watch.mode, + oldValue, + newValue: currentValue, + }, + messageId: crypto.randomUUID(), + meshId: watch.meshId, + senderPubkey: "system", + priority: "now", + nonce: "", + ciphertext: "", + createdAt: new Date().toISOString(), + }; + sendToPeer(watch.presenceId, notification); + log.info("watch triggered", { id: watch.id, url: watch.url, old: oldValue, new: currentValue }); + } + } catch (e) { + log.warn("watch check failed", { id: watch.id, url: watch.url, error: (e as Error).message }); + } +} + // Stream subscriptions: "meshId:streamName" → Set of presenceIds const streamSubscriptions = new Map>(); @@ -3364,6 +3468,47 @@ function handleConnection(ws: WebSocket): void { log.info("ws skill_deploy", { presence_id: presenceId, source: sd.source?.type }); break; } + + // --- URL Watch --- + case "watch": { + const w = msg as any; + const watchId = `w_${crypto.randomUUID().slice(0, 8)}`; + const mode = w.mode ?? "hash"; + const interval = Math.max(w.interval ?? 30, 5); // min 5 seconds + const entry: WatchEntry = { + id: watchId, meshId: conn.meshId, presenceId, + url: w.url, mode, extract: w.extract, notifyOn: w.notify_on ?? "change", + interval, headers: w.headers ?? {}, label: w.label, + lastHash: "", lastValue: "", lastCheck: null, createdAt: new Date(), + timer: setInterval(() => checkWatch(entry), interval * 1000), + }; + urlWatches.set(watchId, entry); + // Do first check immediately to capture baseline + void checkWatch(entry); + sendToPeer(presenceId, { type: "watch_ack", watchId, url: w.url, mode, interval, _reqId: w._reqId } as any); + log.info("ws watch", { presence_id: presenceId, watchId, url: w.url, mode, interval }); + break; + } + case "unwatch": { + const uw = msg as any; + const watch = urlWatches.get(uw.watchId); + if (watch) { clearInterval(watch.timer); urlWatches.delete(uw.watchId); } + sendToPeer(presenceId, { type: "watch_ack", watchId: uw.watchId, url: watch?.url ?? "", mode: watch?.mode ?? "", interval: 0, _reqId: uw._reqId } as any); + break; + } + case "watch_list": { + const myWatches = [...urlWatches.values()].filter(w => w.presenceId === presenceId); + sendToPeer(presenceId, { + type: "watch_list_result", + watches: myWatches.map(w => ({ + id: w.id, url: w.url, mode: w.mode, label: w.label, interval: w.interval, + lastHash: w.lastHash, lastValue: w.lastValue, + lastCheck: w.lastCheck?.toISOString(), createdAt: w.createdAt.toISOString(), + })), + _reqId: (msg as any)._reqId, + } as any); + break; + } } } catch (e) { metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" }); diff --git a/apps/broker/src/types.ts b/apps/broker/src/types.ts index 34a02b2..8523edc 100644 --- a/apps/broker/src/types.ts +++ b/apps/broker/src/types.ts @@ -1104,6 +1104,19 @@ export interface WSVaultDeleteMessage { type: "vault_delete"; key: string; _reqI /** Client → broker: fetch encrypted vault entries for local decryption. */ export interface WSVaultGetMessage { type: "vault_get"; keys: string[]; _reqId?: string; } +/** Client → broker: start watching a URL for changes. */ +export interface WSWatchMessage { type: "watch"; url: string; mode?: "hash" | "json" | "status"; extract?: string; interval?: number; notify_on?: string; headers?: Record; label?: string; _reqId?: string; } +/** Client → broker: stop watching. */ +export interface WSUnwatchMessage { type: "unwatch"; watchId: string; _reqId?: string; } +/** Client → broker: list active watches. */ +export interface WSWatchListMessage { type: "watch_list"; _reqId?: string; } +/** Broker → client: watch created acknowledgement. */ +export interface WSWatchAckMessage { type: "watch_ack"; watchId: string; url: string; mode: string; interval: number; _reqId?: string; } +/** Broker → client: watch list response. */ +export interface WSWatchListResultMessage { type: "watch_list_result"; watches: Array<{ id: string; url: string; mode: string; label?: string; interval: number; lastHash?: string; lastValue?: string; lastCheck?: string; createdAt: string }>; _reqId?: string; } +/** Broker → client: URL change detected. */ +export interface WSWatchTriggeredMessage { type: "watch_triggered"; watchId: string; url: string; label?: string; mode: string; oldValue: string; newValue: string; timestamp: string; } + export type WSClientMessage = | WSHelloMessage | WSSendMessage @@ -1185,7 +1198,10 @@ export type WSClientMessage = | WSVaultSetMessage | WSVaultListMessage | WSVaultDeleteMessage - | WSVaultGetMessage; + | WSVaultGetMessage + | WSWatchMessage + | WSUnwatchMessage + | WSWatchListMessage; // --- Skill messages --- @@ -1333,4 +1349,7 @@ export type WSServerMessage = | WSVaultAckMessage | WSVaultListResultMessage | WSVaultGetResultMessage + | WSWatchAckMessage + | WSWatchListResultMessage + | WSWatchTriggeredMessage | WSErrorMessage; diff --git a/apps/cli/package.json b/apps/cli/package.json index d491c12..e9c951e 100644 --- a/apps/cli/package.json +++ b/apps/cli/package.json @@ -1,6 +1,6 @@ { "name": "claudemesh-cli", - "version": "0.8.6", + "version": "0.8.7", "description": "Claude Code MCP client for claudemesh — peer mesh messaging between Claude sessions.", "keywords": [ "claude-code", diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index e8ff6e5..af2031a 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -1554,6 +1554,35 @@ Your message mode is "${messageMode}". return text(`Skill "${result.name}" deployed.\nFiles: ${result.files.join(", ")}`); } + // --- URL Watch --- + case "mesh_watch": { + const { url, mode, extract, interval, notify_on, headers, label } = (args ?? {}) as any; + if (!url) return text("mesh_watch: `url` required", true); + const client = allClients()[0]; + if (!client) return text("mesh_watch: not connected", true); + const result = await client.watch(url, { mode, extract, interval, notify_on, headers, label }); + if (result.error) return text(`mesh_watch: ${result.error}`, true); + return text(`Watching "${label ?? url}" (${result.mode}, every ${result.interval}s)\nWatch ID: ${result.watchId}`); + } + case "mesh_unwatch": { + const { watch_id } = (args ?? {}) as { watch_id?: string }; + if (!watch_id) return text("mesh_unwatch: `watch_id` required", true); + const client = allClients()[0]; + if (!client) return text("mesh_unwatch: not connected", true); + await client.unwatch(watch_id); + return text(`Watch ${watch_id} stopped.`); + } + case "mesh_watches": { + const client = allClients()[0]; + if (!client) return text("mesh_watches: not connected", true); + const watches = await client.watchList(); + if (watches.length === 0) return text("No active watches."); + const lines = watches.map((w: any) => + `- **${w.id}** ${w.label ? `(${w.label}) ` : ""}${w.url}\n mode: ${w.mode} | interval: ${w.interval}s | last: ${w.lastValue?.slice(0, 30) ?? "pending"} | checked: ${w.lastCheck ?? "never"}` + ); + return text(`${watches.length} active watch(es):\n\n${lines.join("\n")}`); + } + default: return text(`Unknown tool: ${name}`, true); } diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index 8bfc2e4..f65ac71 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -972,4 +972,38 @@ export const TOOLS: Tool[] = [ description: "Remove a credential from your vault.", inputSchema: { type: "object", properties: { key: { type: "string" } }, required: ["key"] }, }, + + // --- URL Watch tools --- + + { + name: "mesh_watch", + description: "Watch a URL for changes. The broker polls it at the given interval and notifies you when the response changes. Works with any URL — websites (hash mode), JSON APIs (json mode), or status codes (status mode).", + inputSchema: { + type: "object", + properties: { + url: { type: "string", description: "URL to watch" }, + mode: { type: "string", enum: ["hash", "json", "status"], description: "Detection mode: hash (SHA-256 of body), json (extract jsonpath value), status (HTTP status code). Default: hash" }, + extract: { type: "string", description: "For json mode: dot path to extract (e.g. 'status' or 'data.deployments[0].status')" }, + interval: { type: "number", description: "Poll interval in seconds (min: 5, default: 30)" }, + notify_on: { type: "string", description: "When to notify: 'change' (default), 'match:', 'not_match:'" }, + headers: { type: "object", description: "Optional HTTP headers (e.g. for auth)" }, + label: { type: "string", description: "Human-readable label for this watch" }, + }, + required: ["url"], + }, + }, + { + name: "mesh_unwatch", + description: "Stop watching a URL.", + inputSchema: { + type: "object", + properties: { watch_id: { type: "string" } }, + required: ["watch_id"], + }, + }, + { + name: "mesh_watches", + description: "List your active URL watches.", + inputSchema: { type: "object", properties: {} }, + }, ]; diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 14305a2..bc35539 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -1346,6 +1346,38 @@ export class BrokerClient { }); } + // --- URL Watch --- + + private watchAckResolvers = new Map void; timer: NodeJS.Timeout }>(); + private watchListResolvers = new Map void; timer: NodeJS.Timeout }>(); + + async watch(url: string, opts?: { mode?: string; extract?: string; interval?: number; notify_on?: string; headers?: Record; label?: string }): Promise { + return new Promise(resolve => { + const reqId = `watch_${Date.now()}`; + const timer = setTimeout(() => { this.watchAckResolvers.delete(reqId); resolve({ error: "timeout" }); }, 10_000); + this.watchAckResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "watch", url, ...opts, _reqId: reqId } as any); + }); + } + + async unwatch(watchId: string): Promise { + return new Promise(resolve => { + const reqId = `unwatch_${Date.now()}`; + const timer = setTimeout(() => { this.watchAckResolvers.delete(reqId); resolve(false); }, 10_000); + this.watchAckResolvers.set(reqId, { resolve: () => resolve(true), timer }); + this.sendRaw({ type: "unwatch", watchId, _reqId: reqId } as any); + }); + } + + async watchList(): Promise { + return new Promise(resolve => { + const reqId = `watchlist_${Date.now()}`; + const timer = setTimeout(() => { this.watchListResolvers.delete(reqId); resolve([]); }, 10_000); + this.watchListResolvers.set(reqId, { resolve, timer }); + this.sendRaw({ type: "watch_list", _reqId: reqId } as any); + }); + } + async getServiceTools(serviceName: string): Promise { // Check cached catalog first const cached = this._serviceCatalog.find(s => s.name === serviceName); @@ -1993,6 +2025,24 @@ export class BrokerClient { r.resolve({ name: (msg as any).name, files: (msg as any).files ?? [] }); } } + if (msg.type === "watch_ack") { + const reqId = (msg as any)._reqId; + if (reqId && this.watchAckResolvers.has(reqId)) { + const r = this.watchAckResolvers.get(reqId)!; + clearTimeout(r.timer); + this.watchAckResolvers.delete(reqId); + r.resolve(msg); + } + } + if (msg.type === "watch_list_result") { + const reqId = (msg as any)._reqId; + if (reqId && this.watchListResolvers.has(reqId)) { + const r = this.watchListResolvers.get(reqId)!; + clearTimeout(r.timer); + this.watchListResolvers.delete(reqId); + r.resolve((msg as any).watches ?? []); + } + } if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null;