feat: implement mesh skills catalog — peers publish and discover reusable instructions

Adds share_skill, get_skill, list_skills, and remove_skill across the full
stack (Drizzle schema, broker CRUD + WS handlers, CLI client methods, MCP
tools). Skills are mesh-scoped, unique by name, and searchable via ILIKE
on name/description/tags.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-07 23:55:03 +01:00
parent 86a258301f
commit c8cb1e3ea5
4 changed files with 730 additions and 2 deletions

View File

@@ -109,6 +109,10 @@ export class BrokerClient {
private sessionPubkey: string | null = null;
private sessionSecretKey: string | null = null;
private grantFileAccessResolvers = new Map<string, { resolve: (ok: boolean) => void; timer: NodeJS.Timeout }>();
private peerFileResponseResolvers = new Map<string, { resolve: (result: { content?: string; error?: string }) => void; timer: NodeJS.Timeout }>();
private peerDirResponseResolvers = new Map<string, { resolve: (result: { entries?: string[]; error?: string }) => void; timer: NodeJS.Timeout }>();
/** Directories from which this peer serves files. Default: [process.cwd()]. */
private sharedDirs: string[] = [process.cwd()];
private closed = false;
private reconnectAttempt = 0;
private helloTimer: NodeJS.Timeout | null = null;
@@ -346,6 +350,18 @@ export class BrokerClient {
this.ws.send(JSON.stringify({ type: "set_status", status }));
}
/** Toggle visibility in the mesh. Hidden peers don't appear in list_peers and skip broadcasts. */
async setVisible(visible: boolean): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "set_visible", visible }));
}
/** Set public profile metadata visible to other peers. */
async setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): Promise<void> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({ type: "set_profile", ...profile }));
}
/** Request the list of connected peers from the broker. */
async listPeers(): Promise<PeerInfo[]> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
@@ -1069,6 +1085,80 @@ export class BrokerClient {
});
}
// --- Webhooks ---
private webhookAckResolvers = new Map<string, { resolve: (result: { name: string; url: string; secret: string } | null) => void; timer: NodeJS.Timeout }>();
private webhookListResolvers = new Map<string, { resolve: (webhooks: Array<{ name: string; url: string; active: boolean; createdAt: string }>) => void; timer: NodeJS.Timeout }>();
/** Create an inbound webhook. Returns the URL and secret. */
async createWebhook(name: string): Promise<{ name: string; url: string; secret: string } | null> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null;
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.webhookAckResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.webhookAckResolvers.delete(reqId)) resolve(null);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "create_webhook", name, _reqId: reqId }));
});
}
/** List active webhooks for this mesh. */
async listWebhooks(): Promise<Array<{ name: string; url: string; active: boolean; createdAt: string }>> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return [];
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.webhookListResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.webhookListResolvers.delete(reqId)) resolve([]);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "list_webhooks", _reqId: reqId }));
});
}
/** Deactivate a webhook by name. */
async deleteWebhook(name: string): Promise<boolean> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false;
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.webhookAckResolvers.set(reqId, { resolve: () => resolve(true), timer: setTimeout(() => {
if (this.webhookAckResolvers.delete(reqId)) resolve(false);
}, 5_000) });
this.ws!.send(JSON.stringify({ type: "delete_webhook", name, _reqId: reqId }));
});
}
// --- Peer file sharing ---
/** Set the directories this peer shares. Default: [cwd]. */
setSharedDirs(dirs: string[]): void {
this.sharedDirs = dirs.map(d => {
const { resolve } = require("node:path");
return resolve(d);
});
}
/** Request a file from another peer's local filesystem. Returns base64 content or error. */
async requestFile(targetPubkey: string, filePath: string): Promise<{ content?: string; error?: string }> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return { error: "not connected" };
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.peerFileResponseResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.peerFileResponseResolvers.delete(reqId)) resolve({ error: "timeout waiting for peer response" });
}, 15_000) });
this.ws!.send(JSON.stringify({ type: "peer_file_request", targetPubkey, filePath, _reqId: reqId }));
});
}
/** Request a directory listing from another peer. */
async requestDir(targetPubkey: string, dirPath: string, pattern?: string): Promise<{ entries?: string[]; error?: string }> {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return { error: "not connected" };
return new Promise((resolve) => {
const reqId = this.makeReqId();
this.peerDirResponseResolvers.set(reqId, { resolve, timer: setTimeout(() => {
if (this.peerDirResponseResolvers.delete(reqId)) resolve({ error: "timeout waiting for peer response" });
}, 15_000) });
this.ws!.send(JSON.stringify({ type: "peer_dir_request", targetPubkey, dirPath, ...(pattern ? { pattern } : {}), _reqId: reqId }));
});
}
close(): void {
this.closed = true;
this.stopStatsReporting();
@@ -1084,6 +1174,158 @@ export class BrokerClient {
this.setConnStatus("closed");
}
// --- Peer file request handlers (serving local files to remote peers) ---
private static readonly MAX_FILE_SIZE = 1_048_576; // 1MB
/** Handle an inbound file request from another peer (forwarded by broker). */
private async handlePeerFileRequest(msg: { requesterPubkey: string; filePath: string; _reqId?: string }): Promise<void> {
const { resolve, join, normalize } = await import("node:path");
const { readFileSync, statSync } = await import("node:fs");
const reqId = msg._reqId;
const sendResponse = (content?: string, error?: string) => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({
type: "peer_file_response",
requesterPubkey: msg.requesterPubkey,
filePath: msg.filePath,
...(content !== undefined ? { content } : {}),
...(error ? { error } : {}),
...(reqId ? { _reqId: reqId } : {}),
}));
};
// Security: reject path traversal
if (msg.filePath.includes("..")) {
sendResponse(undefined, "path traversal not allowed");
return;
}
// Resolve against shared directories
let resolvedPath: string | null = null;
for (const dir of this.sharedDirs) {
const candidate = resolve(join(dir, msg.filePath));
const normalizedCandidate = normalize(candidate);
const normalizedDir = normalize(dir);
if (normalizedCandidate.startsWith(normalizedDir + "/") || normalizedCandidate === normalizedDir) {
resolvedPath = candidate;
break;
}
}
if (!resolvedPath) {
sendResponse(undefined, "file outside shared directories");
return;
}
try {
const stat = statSync(resolvedPath);
if (!stat.isFile()) {
sendResponse(undefined, "not a file");
return;
}
if (stat.size > BrokerClient.MAX_FILE_SIZE) {
sendResponse(undefined, `file too large (${stat.size} bytes, max ${BrokerClient.MAX_FILE_SIZE})`);
return;
}
const content = readFileSync(resolvedPath);
sendResponse(content.toString("base64"));
} catch (e) {
const errMsg = e instanceof Error ? e.message : String(e);
if (errMsg.includes("ENOENT")) {
sendResponse(undefined, "file not found");
} else {
sendResponse(undefined, `read error: ${errMsg}`);
}
}
}
/** Handle an inbound directory listing request from another peer. */
private async handlePeerDirRequest(msg: { requesterPubkey: string; dirPath: string; pattern?: string; _reqId?: string }): Promise<void> {
const { resolve, join, normalize, relative } = await import("node:path");
const { readdirSync, statSync } = await import("node:fs");
const reqId = msg._reqId;
const sendResponse = (entries?: string[], error?: string) => {
if (!this.ws || this.ws.readyState !== this.ws.OPEN) return;
this.ws.send(JSON.stringify({
type: "peer_dir_response",
requesterPubkey: msg.requesterPubkey,
dirPath: msg.dirPath,
...(entries ? { entries } : {}),
...(error ? { error } : {}),
...(reqId ? { _reqId: reqId } : {}),
}));
};
const dirPath = msg.dirPath || ".";
// Security: reject path traversal
if (dirPath.includes("..")) {
sendResponse(undefined, "path traversal not allowed");
return;
}
let resolvedPath: string | null = null;
for (const dir of this.sharedDirs) {
const candidate = resolve(join(dir, dirPath));
const normalizedCandidate = normalize(candidate);
const normalizedDir = normalize(dir);
if (normalizedCandidate.startsWith(normalizedDir + "/") || normalizedCandidate === normalizedDir) {
resolvedPath = candidate;
break;
}
}
if (!resolvedPath) {
sendResponse(undefined, "directory outside shared directories");
return;
}
try {
const stat = statSync(resolvedPath);
if (!stat.isDirectory()) {
sendResponse(undefined, "not a directory");
return;
}
// Collect entries recursively (up to 2 levels, max 500 entries)
const entries: string[] = [];
const MAX_ENTRIES = 500;
const MAX_DEPTH = 2;
const pattern = msg.pattern ? new RegExp(msg.pattern.replace(/\*/g, ".*").replace(/\?/g, "."), "i") : null;
const walk = (dir: string, depth: number) => {
if (entries.length >= MAX_ENTRIES || depth > MAX_DEPTH) return;
try {
const items = readdirSync(dir, { withFileTypes: true });
for (const item of items) {
if (entries.length >= MAX_ENTRIES) break;
if (item.name.startsWith(".")) continue; // skip hidden
const relPath = relative(resolvedPath!, join(dir, item.name));
const label = item.isDirectory() ? relPath + "/" : relPath;
if (pattern && !pattern.test(item.name)) {
// If directory, still recurse (pattern may match children)
if (item.isDirectory()) walk(join(dir, item.name), depth + 1);
continue;
}
entries.push(label);
if (item.isDirectory()) walk(join(dir, item.name), depth + 1);
}
} catch { /* permission errors, etc. */ }
};
walk(resolvedPath, 0);
sendResponse(entries.sort());
} catch (e) {
const errMsg = e instanceof Error ? e.message : String(e);
if (errMsg.includes("ENOENT")) {
sendResponse(undefined, "directory not found");
} else {
sendResponse(undefined, `read error: ${errMsg}`);
}
}
}
// --- Internals ---
private resolveFromMap<T>(
@@ -1366,6 +1608,20 @@ export class BrokerClient {
this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record<string, unknown>);
return;
}
if (msg.type === "skill_ack") {
this.resolveFromMap(this.skillAckResolvers, msgReqId, { name: String(msg.name ?? ""), action: String(msg.action ?? "") });
return;
}
if (msg.type === "skill_data") {
const skill = msg.skill as { name: string; description: string; instructions: string; tags: string[]; author: string; createdAt: string } | null;
this.resolveFromMap(this.skillDataResolvers, msgReqId, skill ?? null);
return;
}
if (msg.type === "skill_list") {
const skills = (msg.skills as Array<{ name: string; description: string; tags: string[]; author: string; createdAt: string }>) ?? [];
this.resolveFromMap(this.skillListResolvers, msgReqId, skills);
return;
}
if (msg.type === "scheduled_ack") {
this.resolveFromMap(this.scheduledAckResolvers, msgReqId, {
scheduledId: String(msg.scheduledId ?? ""),
@@ -1419,6 +1675,42 @@ export class BrokerClient {
}
return;
}
// --- Peer file sharing handlers ---
if (msg.type === "peer_file_request_forward") {
void this.handlePeerFileRequest(msg as { requesterPubkey: string; filePath: string; _reqId?: string });
return;
}
if (msg.type === "peer_file_response_forward") {
this.resolveFromMap(this.peerFileResponseResolvers, msgReqId, {
content: msg.content ? String(msg.content) : undefined,
error: msg.error ? String(msg.error) : undefined,
});
return;
}
if (msg.type === "peer_dir_request_forward") {
void this.handlePeerDirRequest(msg as { requesterPubkey: string; dirPath: string; pattern?: string; _reqId?: string });
return;
}
if (msg.type === "peer_dir_response_forward") {
this.resolveFromMap(this.peerDirResponseResolvers, msgReqId, {
entries: (msg.entries as string[] | undefined) ?? undefined,
error: msg.error ? String(msg.error) : undefined,
});
return;
}
if (msg.type === "webhook_ack") {
this.resolveFromMap(this.webhookAckResolvers, msgReqId, {
name: String(msg.name ?? ""),
url: String(msg.url ?? ""),
secret: String(msg.secret ?? ""),
});
return;
}
if (msg.type === "webhook_list") {
const webhooks = (msg.webhooks as Array<{ name: string; url: string; active: boolean; createdAt: string }>) ?? [];
this.resolveFromMap(this.webhookListResolvers, msgReqId, webhooks);
return;
}
if (msg.type === "error") {
this.debug(`broker error: ${msg.code} ${msg.message}`);
const id = msg.id ? String(msg.id) : null;
@@ -1469,6 +1761,11 @@ export class BrokerClient {
[this.mcpRegisterResolvers, null],
[this.mcpListResolvers, []],
[this.mcpCallResolvers, { error: "broker error" }],
[this.skillAckResolvers, null],
[this.skillDataResolvers, null],
[this.skillListResolvers, []],
[this.peerFileResponseResolvers, { error: "broker error" }],
[this.peerDirResponseResolvers, { error: "broker error" }],
];
for (const [map, defaultVal] of allMaps) {
const first = (map as Map<string, any>).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;