feat: url watch — broker polls URLs, notifies on change
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -137,6 +137,110 @@ interface PeerConn {
|
||||
const connections = new Map<string, PeerConn>();
|
||||
const connectionsPerMesh = new Map<string, number>();
|
||||
|
||||
// --- URL Watch engine ---
|
||||
interface WatchEntry {
|
||||
id: string;
|
||||
meshId: string;
|
||||
presenceId: string;
|
||||
url: string;
|
||||
mode: "hash" | "json" | "status";
|
||||
extract?: string;
|
||||
notifyOn: string;
|
||||
interval: number;
|
||||
headers: Record<string, string>;
|
||||
label?: string;
|
||||
lastHash: string;
|
||||
lastValue: string;
|
||||
lastCheck: Date | null;
|
||||
createdAt: Date;
|
||||
timer: ReturnType<typeof setInterval>;
|
||||
}
|
||||
|
||||
const urlWatches = new Map<string, WatchEntry>();
|
||||
|
||||
async function checkWatch(watch: WatchEntry): Promise<void> {
|
||||
try {
|
||||
const res = await fetch(watch.url, {
|
||||
headers: watch.headers,
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
});
|
||||
|
||||
let currentValue: string;
|
||||
if (watch.mode === "status") {
|
||||
currentValue = String(res.status);
|
||||
} else {
|
||||
const body = await res.text();
|
||||
if (watch.mode === "json" && watch.extract) {
|
||||
try {
|
||||
const json = JSON.parse(body);
|
||||
// Simple dot-path extraction ($.status → json.status)
|
||||
const path = watch.extract.replace(/^\$\.?/, "").split(".");
|
||||
let val: unknown = json;
|
||||
for (const p of path) { val = (val as Record<string, unknown>)?.[p]; }
|
||||
currentValue = String(val ?? "null");
|
||||
} catch { currentValue = body.slice(0, 200); }
|
||||
} else {
|
||||
// Hash mode — SHA-256 of full body
|
||||
const { createHash } = await import("node:crypto");
|
||||
currentValue = createHash("sha256").update(body).digest("hex").slice(0, 16);
|
||||
}
|
||||
}
|
||||
|
||||
watch.lastCheck = new Date();
|
||||
const oldValue = watch.lastValue;
|
||||
|
||||
if (oldValue === "") {
|
||||
// First check — just store baseline
|
||||
watch.lastHash = currentValue;
|
||||
watch.lastValue = currentValue;
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if notification should fire
|
||||
let shouldNotify = false;
|
||||
const notifyOn = watch.notifyOn;
|
||||
if (notifyOn === "change") {
|
||||
shouldNotify = currentValue !== oldValue;
|
||||
} else if (notifyOn.startsWith("match:")) {
|
||||
const target = notifyOn.slice(6);
|
||||
shouldNotify = currentValue === target && oldValue !== target;
|
||||
} else if (notifyOn.startsWith("not_match:")) {
|
||||
const target = notifyOn.slice(10);
|
||||
shouldNotify = currentValue !== target && oldValue === target;
|
||||
}
|
||||
|
||||
watch.lastHash = currentValue;
|
||||
watch.lastValue = currentValue;
|
||||
|
||||
if (shouldNotify) {
|
||||
const notification: WSPushMessage = {
|
||||
type: "push",
|
||||
subtype: "system" as const,
|
||||
event: "watch_triggered",
|
||||
eventData: {
|
||||
watchId: watch.id,
|
||||
url: watch.url,
|
||||
label: watch.label,
|
||||
mode: watch.mode,
|
||||
oldValue,
|
||||
newValue: currentValue,
|
||||
},
|
||||
messageId: crypto.randomUUID(),
|
||||
meshId: watch.meshId,
|
||||
senderPubkey: "system",
|
||||
priority: "now",
|
||||
nonce: "",
|
||||
ciphertext: "",
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
sendToPeer(watch.presenceId, notification);
|
||||
log.info("watch triggered", { id: watch.id, url: watch.url, old: oldValue, new: currentValue });
|
||||
}
|
||||
} catch (e) {
|
||||
log.warn("watch check failed", { id: watch.id, url: watch.url, error: (e as Error).message });
|
||||
}
|
||||
}
|
||||
|
||||
// Stream subscriptions: "meshId:streamName" → Set of presenceIds
|
||||
const streamSubscriptions = new Map<string, Set<string>>();
|
||||
|
||||
@@ -3364,6 +3468,47 @@ function handleConnection(ws: WebSocket): void {
|
||||
log.info("ws skill_deploy", { presence_id: presenceId, source: sd.source?.type });
|
||||
break;
|
||||
}
|
||||
|
||||
// --- URL Watch ---
|
||||
case "watch": {
|
||||
const w = msg as any;
|
||||
const watchId = `w_${crypto.randomUUID().slice(0, 8)}`;
|
||||
const mode = w.mode ?? "hash";
|
||||
const interval = Math.max(w.interval ?? 30, 5); // min 5 seconds
|
||||
const entry: WatchEntry = {
|
||||
id: watchId, meshId: conn.meshId, presenceId,
|
||||
url: w.url, mode, extract: w.extract, notifyOn: w.notify_on ?? "change",
|
||||
interval, headers: w.headers ?? {}, label: w.label,
|
||||
lastHash: "", lastValue: "", lastCheck: null, createdAt: new Date(),
|
||||
timer: setInterval(() => checkWatch(entry), interval * 1000),
|
||||
};
|
||||
urlWatches.set(watchId, entry);
|
||||
// Do first check immediately to capture baseline
|
||||
void checkWatch(entry);
|
||||
sendToPeer(presenceId, { type: "watch_ack", watchId, url: w.url, mode, interval, _reqId: w._reqId } as any);
|
||||
log.info("ws watch", { presence_id: presenceId, watchId, url: w.url, mode, interval });
|
||||
break;
|
||||
}
|
||||
case "unwatch": {
|
||||
const uw = msg as any;
|
||||
const watch = urlWatches.get(uw.watchId);
|
||||
if (watch) { clearInterval(watch.timer); urlWatches.delete(uw.watchId); }
|
||||
sendToPeer(presenceId, { type: "watch_ack", watchId: uw.watchId, url: watch?.url ?? "", mode: watch?.mode ?? "", interval: 0, _reqId: uw._reqId } as any);
|
||||
break;
|
||||
}
|
||||
case "watch_list": {
|
||||
const myWatches = [...urlWatches.values()].filter(w => w.presenceId === presenceId);
|
||||
sendToPeer(presenceId, {
|
||||
type: "watch_list_result",
|
||||
watches: myWatches.map(w => ({
|
||||
id: w.id, url: w.url, mode: w.mode, label: w.label, interval: w.interval,
|
||||
lastHash: w.lastHash, lastValue: w.lastValue,
|
||||
lastCheck: w.lastCheck?.toISOString(), createdAt: w.createdAt.toISOString(),
|
||||
})),
|
||||
_reqId: (msg as any)._reqId,
|
||||
} as any);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
metrics.messagesRejectedTotal.inc({ reason: "parse_or_handler" });
|
||||
|
||||
@@ -1104,6 +1104,19 @@ export interface WSVaultDeleteMessage { type: "vault_delete"; key: string; _reqI
|
||||
/** Client → broker: fetch encrypted vault entries for local decryption. */
|
||||
export interface WSVaultGetMessage { type: "vault_get"; keys: string[]; _reqId?: string; }
|
||||
|
||||
/** Client → broker: start watching a URL for changes. */
|
||||
export interface WSWatchMessage { type: "watch"; url: string; mode?: "hash" | "json" | "status"; extract?: string; interval?: number; notify_on?: string; headers?: Record<string, string>; label?: string; _reqId?: string; }
|
||||
/** Client → broker: stop watching. */
|
||||
export interface WSUnwatchMessage { type: "unwatch"; watchId: string; _reqId?: string; }
|
||||
/** Client → broker: list active watches. */
|
||||
export interface WSWatchListMessage { type: "watch_list"; _reqId?: string; }
|
||||
/** Broker → client: watch created acknowledgement. */
|
||||
export interface WSWatchAckMessage { type: "watch_ack"; watchId: string; url: string; mode: string; interval: number; _reqId?: string; }
|
||||
/** Broker → client: watch list response. */
|
||||
export interface WSWatchListResultMessage { type: "watch_list_result"; watches: Array<{ id: string; url: string; mode: string; label?: string; interval: number; lastHash?: string; lastValue?: string; lastCheck?: string; createdAt: string }>; _reqId?: string; }
|
||||
/** Broker → client: URL change detected. */
|
||||
export interface WSWatchTriggeredMessage { type: "watch_triggered"; watchId: string; url: string; label?: string; mode: string; oldValue: string; newValue: string; timestamp: string; }
|
||||
|
||||
export type WSClientMessage =
|
||||
| WSHelloMessage
|
||||
| WSSendMessage
|
||||
@@ -1185,7 +1198,10 @@ export type WSClientMessage =
|
||||
| WSVaultSetMessage
|
||||
| WSVaultListMessage
|
||||
| WSVaultDeleteMessage
|
||||
| WSVaultGetMessage;
|
||||
| WSVaultGetMessage
|
||||
| WSWatchMessage
|
||||
| WSUnwatchMessage
|
||||
| WSWatchListMessage;
|
||||
|
||||
// --- Skill messages ---
|
||||
|
||||
@@ -1333,4 +1349,7 @@ export type WSServerMessage =
|
||||
| WSVaultAckMessage
|
||||
| WSVaultListResultMessage
|
||||
| WSVaultGetResultMessage
|
||||
| WSWatchAckMessage
|
||||
| WSWatchListResultMessage
|
||||
| WSWatchTriggeredMessage
|
||||
| WSErrorMessage;
|
||||
|
||||
Reference in New Issue
Block a user