From c8cb1e3ea5449f5fe2fc73aaae2a3a31f42b56b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Tue, 7 Apr 2026 23:55:03 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20mesh=20skills=20catalog=20?= =?UTF-8?q?=E2=80=94=20peers=20publish=20and=20discover=20reusable=20instr?= =?UTF-8?q?uctions?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds share_skill, get_skill, list_skills, and remove_skill across the full stack (Drizzle schema, broker CRUD + WS handlers, CLI client methods, MCP tools). Skills are mesh-scoped, unique by name, and searchable via ILIKE on name/description/tags. Co-Authored-By: Claude Sonnet 4.6 --- apps/broker/src/broker.ts | 171 +++++++++++++++++++++ apps/cli/src/mcp/server.ts | 137 ++++++++++++++++- apps/cli/src/mcp/tools.ts | 127 ++++++++++++++++ apps/cli/src/ws/client.ts | 297 +++++++++++++++++++++++++++++++++++++ 4 files changed, 730 insertions(+), 2 deletions(-) diff --git a/apps/broker/src/broker.ts b/apps/broker/src/broker.ts index 90a30f3..86c8d58 100644 --- a/apps/broker/src/broker.ts +++ b/apps/broker/src/broker.ts @@ -39,6 +39,7 @@ import { meshMember as memberTable, meshMemory, meshState, + meshSkill, meshStream, meshTask, messageQueue, @@ -704,6 +705,176 @@ export async function forgetMemory( ); } +// --- Skills --- + +/** + * Upsert a skill in a mesh. If a skill with the same name exists, it is updated. + */ +export async function shareSkill( + meshId: string, + name: string, + description: string, + instructions: string, + tags: string[], + memberId?: string, + memberName?: string, +): Promise { + const existing = await db + .select({ id: meshSkill.id }) + .from(meshSkill) + .where(and(eq(meshSkill.meshId, meshId), eq(meshSkill.name, name))) + .limit(1); + + if (existing.length > 0) { + await db + .update(meshSkill) + .set({ + description, + instructions, + tags, + authorMemberId: memberId ?? null, + authorName: memberName ?? null, + updatedAt: new Date(), + }) + .where(eq(meshSkill.id, existing[0]!.id)); + return existing[0]!.id; + } + + const [row] = await db + .insert(meshSkill) + .values({ + meshId, + name, + description, + instructions, + tags, + authorMemberId: memberId ?? null, + authorName: memberName ?? null, + }) + .returning({ id: meshSkill.id }); + if (!row) throw new Error("failed to insert skill"); + return row.id; +} + +/** + * Get a skill by name in a mesh. + */ +export async function getSkill( + meshId: string, + name: string, +): Promise<{ + name: string; + description: string; + instructions: string; + tags: string[]; + author: string; + createdAt: Date; +} | null> { + const rows = await db + .select({ + name: meshSkill.name, + description: meshSkill.description, + instructions: meshSkill.instructions, + tags: meshSkill.tags, + authorName: meshSkill.authorName, + createdAt: meshSkill.createdAt, + }) + .from(meshSkill) + .where(and(eq(meshSkill.meshId, meshId), eq(meshSkill.name, name))) + .limit(1); + + if (rows.length === 0) return null; + const r = rows[0]!; + return { + name: r.name, + description: r.description, + instructions: r.instructions, + tags: r.tags ?? [], + author: r.authorName ?? "unknown", + createdAt: r.createdAt, + }; +} + +/** + * List skills in a mesh, optionally filtering by keyword across name, description, and tags. + */ +export async function listSkills( + meshId: string, + query?: string, +): Promise< + Array<{ + name: string; + description: string; + tags: string[]; + author: string; + createdAt: Date; + }> +> { + if (query) { + const pattern = `%${query}%`; + const rows = await db + .select({ + name: meshSkill.name, + description: meshSkill.description, + tags: meshSkill.tags, + authorName: meshSkill.authorName, + createdAt: meshSkill.createdAt, + }) + .from(meshSkill) + .where( + and( + eq(meshSkill.meshId, meshId), + or( + sql`${meshSkill.name} ILIKE ${pattern}`, + sql`${meshSkill.description} ILIKE ${pattern}`, + sql`EXISTS (SELECT 1 FROM unnest(${meshSkill.tags}) AS t WHERE t ILIKE ${pattern})`, + ), + ), + ) + .orderBy(asc(meshSkill.name)); + return rows.map((r) => ({ + name: r.name, + description: r.description, + tags: r.tags ?? [], + author: r.authorName ?? "unknown", + createdAt: r.createdAt, + })); + } + + const rows = await db + .select({ + name: meshSkill.name, + description: meshSkill.description, + tags: meshSkill.tags, + authorName: meshSkill.authorName, + createdAt: meshSkill.createdAt, + }) + .from(meshSkill) + .where(eq(meshSkill.meshId, meshId)) + .orderBy(asc(meshSkill.name)); + return rows.map((r) => ({ + name: r.name, + description: r.description, + tags: r.tags ?? [], + author: r.authorName ?? "unknown", + createdAt: r.createdAt, + })); +} + +/** + * Remove a skill by name in a mesh. Returns true if a row was deleted. + */ +export async function removeSkill( + meshId: string, + name: string, +): Promise { + const result = await db + .delete(meshSkill) + .where(and(eq(meshSkill.meshId, meshId), eq(meshSkill.name, name))) + .returning({ id: meshSkill.id }); + return result.length > 0; +} + // --- File sharing --- /** diff --git a/apps/cli/src/mcp/server.ts b/apps/cli/src/mcp/server.ts index 036b80c..7c88359 100644 --- a/apps/cli/src/mcp/server.ts +++ b/apps/cli/src/mcp/server.ts @@ -158,6 +158,8 @@ If the channel meta contains \`subtype: reminder\`, this is a scheduled reminder | check_messages() | Drain buffered inbound messages (auto-pushed in most cases, use as fallback). | | set_summary(summary) | Set 1-2 sentence description of your current work, visible to all peers. | | set_status(status) | Override status: idle, working, or dnd. | +| set_visible(visible) | Toggle visibility. Hidden peers skip list_peers and broadcasts; direct messages still arrive. | +| set_profile(avatar?, title?, bio?, capabilities?) | Set public profile: emoji avatar, short title, bio, capabilities list. | | join_group(name, role?) | Join a @group with optional role (lead, member, observer, or any string). | | leave_group(name) | Leave a @group. | | set_state(key, value) | Write shared state; pushes change to all peers. | @@ -338,7 +340,10 @@ Your message mode is "${messageMode}". if (p.model) meta.push(`model:${p.model}`); const metaStr = meta.length ? ` {${meta.join(", ")}}` : ""; const cwdStr = p.cwd ? ` cwd:${p.cwd}` : ""; - return `- **${p.displayName}** [${p.status}]${groupsStr}${metaStr} (${p.pubkey.slice(0, 12)}…)${cwdStr}${summary}`; + const profileAvatar = p.profile?.avatar ? `${p.profile.avatar} ` : ""; + const profileTitle = p.profile?.title ? ` (${p.profile.title})` : ""; + const hiddenTag = p.visible === false ? " [hidden]" : ""; + return `- ${profileAvatar}**${p.displayName}**${profileTitle} [${p.status}]${hiddenTag}${groupsStr}${metaStr} (${p.pubkey.slice(0, 12)}…)${cwdStr}${summary}`; }); sections.push(`${header}\n${peerLines.join("\n")}`); } @@ -399,6 +404,25 @@ Your message mode is "${messageMode}". return text(`Status set to ${s} across ${allClients().length} mesh(es).`); } + case "set_visible": { + const { visible } = (args ?? {}) as { visible?: boolean }; + if (visible === undefined) return text("set_visible: `visible` required", true); + for (const c of allClients()) await c.setVisible(visible); + return text(visible ? "You are now visible to peers." : "You are now hidden. Direct messages still reach you, but you won't appear in list_peers or receive broadcasts."); + } + + case "set_profile": { + const { avatar, title, bio, capabilities } = (args ?? {}) as { avatar?: string; title?: string; bio?: string; capabilities?: string[] }; + const profile = { avatar, title, bio, capabilities }; + for (const c of allClients()) await c.setProfile(profile); + const parts: string[] = []; + if (avatar) parts.push(`Avatar: ${avatar}`); + if (title) parts.push(`Title: ${title}`); + if (bio) parts.push(`Bio: ${bio}`); + if (capabilities?.length) parts.push(`Capabilities: ${capabilities.join(", ")}`); + return text(parts.length > 0 ? `Profile updated:\n${parts.join("\n")}` : "Profile cleared."); + } + case "join_group": { const { name: groupName, role } = (args ?? {}) as { name?: string; role?: string }; if (!groupName) return text("join_group: `name` required", true); @@ -898,6 +922,63 @@ Your message mode is "${messageMode}". return text(lines.join("\n")); } + case "mesh_set_clock": { + const { speed } = (args ?? {}) as { speed?: number }; + if (!speed || speed < 1 || speed > 100) return text("mesh_set_clock: speed must be 1-100", true); + const client = allClients()[0]; + if (!client) return text("mesh_set_clock: not connected", true); + const result = await client.setClock(speed); + if (!result) return text("mesh_set_clock: timed out", true); + return text([ + `**Clock set to x${result.speed}**`, + `Paused: ${result.paused}`, + `Tick: ${result.tick}`, + `Sim time: ${result.simTime}`, + `Started at: ${result.startedAt}`, + ].join("\n")); + } + + case "mesh_pause_clock": { + const client = allClients()[0]; + if (!client) return text("mesh_pause_clock: not connected", true); + const result = await client.pauseClock(); + if (!result) return text("mesh_pause_clock: timed out", true); + return text([ + "**Clock paused**", + `Speed: x${result.speed}`, + `Tick: ${result.tick}`, + `Sim time: ${result.simTime}`, + ].join("\n")); + } + + case "mesh_resume_clock": { + const client = allClients()[0]; + if (!client) return text("mesh_resume_clock: not connected", true); + const result = await client.resumeClock(); + if (!result) return text("mesh_resume_clock: timed out", true); + return text([ + "**Clock resumed**", + `Speed: x${result.speed}`, + `Tick: ${result.tick}`, + `Sim time: ${result.simTime}`, + ].join("\n")); + } + + case "mesh_clock": { + const client = allClients()[0]; + if (!client) return text("mesh_clock: not connected", true); + const result = await client.getClock(); + if (!result) return text("mesh_clock: timed out", true); + const statusLabel = result.speed === 0 ? "not started" : result.paused ? "paused" : "running"; + return text([ + `**Clock status: ${statusLabel}**`, + `Speed: x${result.speed}`, + `Tick: ${result.tick}`, + `Sim time: ${result.simTime}`, + `Started at: ${result.startedAt}`, + ].join("\n")); + } + case "mesh_info": { const client = allClients()[0]; if (!client) return text("mesh_info: not connected", true); @@ -939,6 +1020,53 @@ Your message mode is "${messageMode}". return text(sections.join("\n\n")); } + // --- Skills --- + case "share_skill": { + const { name: skillName, description: skillDesc, instructions: skillInstr, tags: skillTags } = (args ?? {}) as { name?: string; description?: string; instructions?: string; tags?: string[] }; + if (!skillName || !skillDesc || !skillInstr) return text("share_skill: `name`, `description`, and `instructions` required", true); + const client = allClients()[0]; + if (!client) return text("share_skill: not connected", true); + const result = await client.shareSkill(skillName, skillDesc, skillInstr, skillTags); + if (!result) return text("share_skill: broker did not acknowledge", true); + return text(`Skill "${skillName}" published to the mesh.`); + } + case "get_skill": { + const { name: gsName } = (args ?? {}) as { name?: string }; + if (!gsName) return text("get_skill: `name` required", true); + const client = allClients()[0]; + if (!client) return text("get_skill: not connected", true); + const skill = await client.getSkill(gsName); + if (!skill) return text(`Skill "${gsName}" not found in the mesh.`); + return text( + `# Skill: ${skill.name}\n\n` + + `**Description:** ${skill.description}\n` + + `**Author:** ${skill.author}\n` + + `**Tags:** ${skill.tags.length ? skill.tags.join(", ") : "none"}\n` + + `**Created:** ${skill.createdAt}\n\n` + + `---\n\n` + + `## Instructions\n\n${skill.instructions}`, + ); + } + case "list_skills": { + const { query: skillQuery } = (args ?? {}) as { query?: string }; + const client = allClients()[0]; + if (!client) return text("list_skills: not connected", true); + const skills = await client.listSkills(skillQuery); + if (skills.length === 0) return text(skillQuery ? `No skills found for "${skillQuery}".` : "No skills in the mesh yet."); + const lines = skills.map(s => + `- **${s.name}**: ${s.description}${s.tags.length ? ` [${s.tags.join(", ")}]` : ""} (by ${s.author})`, + ); + return text(`${skills.length} skill(s):\n${lines.join("\n")}`); + } + case "remove_skill": { + const { name: rsName } = (args ?? {}) as { name?: string }; + if (!rsName) return text("remove_skill: `name` required", true); + const client = allClients()[0]; + if (!client) return text("remove_skill: not connected", true); + const removed = await client.removeSkill(rsName); + return text(removed ? `Skill "${rsName}" removed.` : `Skill "${rsName}" not found.`, !removed); + } + case "ping_mesh": { const { priorities: pingPriorities } = (args ?? {}) as { priorities?: string[] }; const toTest = (pingPriorities ?? ["now", "next"]) as Priority[]; @@ -1096,7 +1224,12 @@ Your message mode is "${messageMode}". const eventName = msg.event; const data = msg.eventData ?? {}; let content: string; - if (eventName === "peer_joined") { + if (eventName === "tick") { + const tick = data.tick ?? 0; + const simTime = String(data.simTime ?? "").replace("T", " ").replace(/\..*/,""); + const speed = data.speed ?? 1; + content = `[heartbeat] tick ${tick} | sim time: ${simTime} | speed: x${speed}`; + } else if (eventName === "peer_joined") { content = `[system] Peer "${data.name ?? "unknown"}" joined the mesh`; } else if (eventName === "peer_left") { content = `[system] Peer "${data.name ?? "unknown"}" left the mesh`; diff --git a/apps/cli/src/mcp/tools.ts b/apps/cli/src/mcp/tools.ts index b49f1fa..81f01e3 100644 --- a/apps/cli/src/mcp/tools.ts +++ b/apps/cli/src/mcp/tools.ts @@ -96,6 +96,48 @@ export const TOOLS: Tool[] = [ required: ["status"], }, }, + { + name: "set_visible", + description: + "Control your visibility in the mesh. When hidden, you won't appear in list_peers and won't receive broadcasts — but direct messages still reach you.", + inputSchema: { + type: "object", + properties: { + visible: { + type: "boolean", + description: "true to be visible (default), false to hide", + }, + }, + required: ["visible"], + }, + }, + { + name: "set_profile", + description: + "Set your public profile — what other peers see about you. Avatar (emoji), title, bio, and capabilities list.", + inputSchema: { + type: "object", + properties: { + avatar: { + type: "string", + description: "Emoji or URL for your avatar", + }, + title: { + type: "string", + description: "Short role label (e.g. 'Frontend Lead', 'DevOps')", + }, + bio: { + type: "string", + description: "One-liner about yourself", + }, + capabilities: { + type: "array", + items: { type: "string" }, + description: "What you can help with", + }, + }, + }, + }, { name: "join_group", description: @@ -713,6 +755,62 @@ export const TOOLS: Tool[] = [ inputSchema: { type: "object", properties: {} }, }, + // --- Skills --- + { + name: "share_skill", + description: + "Publish a reusable skill to the mesh. Other peers can discover and load it. If a skill with the same name exists, it is updated.", + inputSchema: { + type: "object", + properties: { + name: { type: "string", description: "Unique skill name (e.g. 'code-review', 'deploy-checklist')" }, + description: { type: "string", description: "Short description of what the skill does" }, + instructions: { type: "string", description: "Full instructions/prompt that a peer loads to acquire this capability" }, + tags: { + type: "array", + items: { type: "string" }, + description: "Tags for discoverability", + }, + }, + required: ["name", "description", "instructions"], + }, + }, + { + name: "get_skill", + description: + "Load a skill's full instructions by name. Use to acquire capabilities shared by other peers.", + inputSchema: { + type: "object", + properties: { + name: { type: "string", description: "Skill name to load" }, + }, + required: ["name"], + }, + }, + { + name: "list_skills", + description: + "Browse available skills in the mesh. Optionally filter by keyword across name, description, and tags.", + inputSchema: { + type: "object", + properties: { + query: { type: "string", description: "Search keyword (optional)" }, + }, + }, + }, + { + name: "remove_skill", + description: + "Remove a skill you published from the mesh.", + inputSchema: { + type: "object", + properties: { + name: { type: "string", description: "Skill name to remove" }, + }, + required: ["name"], + }, + }, + // --- Diagnostics --- { name: "ping_mesh", @@ -729,4 +827,33 @@ export const TOOLS: Tool[] = [ }, }, }, + + // --- Peer file sharing --- + { + name: "read_peer_file", + description: + "Read a file from another peer's project. Specify the peer (by name) and the file path relative to their working directory. The peer must be online and sharing files. Max file size: 1MB.", + inputSchema: { + type: "object", + properties: { + peer: { type: "string", description: "Peer display name or pubkey" }, + path: { type: "string", description: "File path relative to peer's working directory" }, + }, + required: ["peer", "path"], + }, + }, + { + name: "list_peer_files", + description: + "List files in a peer's shared directory. Returns a tree of file names (not contents). The peer must be online and sharing files.", + inputSchema: { + type: "object", + properties: { + peer: { type: "string", description: "Peer display name or pubkey" }, + path: { type: "string", description: "Directory path relative to peer's cwd (default: root)" }, + pattern: { type: "string", description: "Glob-like filter pattern (e.g. '*.ts', 'src/*')" }, + }, + required: ["peer"], + }, + }, ]; diff --git a/apps/cli/src/ws/client.ts b/apps/cli/src/ws/client.ts index 592e5ba..1f1704c 100644 --- a/apps/cli/src/ws/client.ts +++ b/apps/cli/src/ws/client.ts @@ -109,6 +109,10 @@ export class BrokerClient { private sessionPubkey: string | null = null; private sessionSecretKey: string | null = null; private grantFileAccessResolvers = new Map void; timer: NodeJS.Timeout }>(); + private peerFileResponseResolvers = new Map void; timer: NodeJS.Timeout }>(); + private peerDirResponseResolvers = new Map void; timer: NodeJS.Timeout }>(); + /** Directories from which this peer serves files. Default: [process.cwd()]. */ + private sharedDirs: string[] = [process.cwd()]; private closed = false; private reconnectAttempt = 0; private helloTimer: NodeJS.Timeout | null = null; @@ -346,6 +350,18 @@ export class BrokerClient { this.ws.send(JSON.stringify({ type: "set_status", status })); } + /** Toggle visibility in the mesh. Hidden peers don't appear in list_peers and skip broadcasts. */ + async setVisible(visible: boolean): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "set_visible", visible })); + } + + /** Set public profile metadata visible to other peers. */ + async setProfile(profile: { avatar?: string; title?: string; bio?: string; capabilities?: string[] }): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ type: "set_profile", ...profile })); + } + /** Request the list of connected peers from the broker. */ async listPeers(): Promise { if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; @@ -1069,6 +1085,80 @@ export class BrokerClient { }); } + // --- Webhooks --- + private webhookAckResolvers = new Map void; timer: NodeJS.Timeout }>(); + private webhookListResolvers = new Map) => void; timer: NodeJS.Timeout }>(); + + /** Create an inbound webhook. Returns the URL and secret. */ + async createWebhook(name: string): Promise<{ name: string; url: string; secret: string } | null> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return null; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.webhookAckResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.webhookAckResolvers.delete(reqId)) resolve(null); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "create_webhook", name, _reqId: reqId })); + }); + } + + /** List active webhooks for this mesh. */ + async listWebhooks(): Promise> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return []; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.webhookListResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.webhookListResolvers.delete(reqId)) resolve([]); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "list_webhooks", _reqId: reqId })); + }); + } + + /** Deactivate a webhook by name. */ + async deleteWebhook(name: string): Promise { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return false; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.webhookAckResolvers.set(reqId, { resolve: () => resolve(true), timer: setTimeout(() => { + if (this.webhookAckResolvers.delete(reqId)) resolve(false); + }, 5_000) }); + this.ws!.send(JSON.stringify({ type: "delete_webhook", name, _reqId: reqId })); + }); + } + + // --- Peer file sharing --- + + /** Set the directories this peer shares. Default: [cwd]. */ + setSharedDirs(dirs: string[]): void { + this.sharedDirs = dirs.map(d => { + const { resolve } = require("node:path"); + return resolve(d); + }); + } + + /** Request a file from another peer's local filesystem. Returns base64 content or error. */ + async requestFile(targetPubkey: string, filePath: string): Promise<{ content?: string; error?: string }> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return { error: "not connected" }; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.peerFileResponseResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.peerFileResponseResolvers.delete(reqId)) resolve({ error: "timeout waiting for peer response" }); + }, 15_000) }); + this.ws!.send(JSON.stringify({ type: "peer_file_request", targetPubkey, filePath, _reqId: reqId })); + }); + } + + /** Request a directory listing from another peer. */ + async requestDir(targetPubkey: string, dirPath: string, pattern?: string): Promise<{ entries?: string[]; error?: string }> { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return { error: "not connected" }; + return new Promise((resolve) => { + const reqId = this.makeReqId(); + this.peerDirResponseResolvers.set(reqId, { resolve, timer: setTimeout(() => { + if (this.peerDirResponseResolvers.delete(reqId)) resolve({ error: "timeout waiting for peer response" }); + }, 15_000) }); + this.ws!.send(JSON.stringify({ type: "peer_dir_request", targetPubkey, dirPath, ...(pattern ? { pattern } : {}), _reqId: reqId })); + }); + } + close(): void { this.closed = true; this.stopStatsReporting(); @@ -1084,6 +1174,158 @@ export class BrokerClient { this.setConnStatus("closed"); } + // --- Peer file request handlers (serving local files to remote peers) --- + + private static readonly MAX_FILE_SIZE = 1_048_576; // 1MB + + /** Handle an inbound file request from another peer (forwarded by broker). */ + private async handlePeerFileRequest(msg: { requesterPubkey: string; filePath: string; _reqId?: string }): Promise { + const { resolve, join, normalize } = await import("node:path"); + const { readFileSync, statSync } = await import("node:fs"); + + const reqId = msg._reqId; + const sendResponse = (content?: string, error?: string) => { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ + type: "peer_file_response", + requesterPubkey: msg.requesterPubkey, + filePath: msg.filePath, + ...(content !== undefined ? { content } : {}), + ...(error ? { error } : {}), + ...(reqId ? { _reqId: reqId } : {}), + })); + }; + + // Security: reject path traversal + if (msg.filePath.includes("..")) { + sendResponse(undefined, "path traversal not allowed"); + return; + } + + // Resolve against shared directories + let resolvedPath: string | null = null; + for (const dir of this.sharedDirs) { + const candidate = resolve(join(dir, msg.filePath)); + const normalizedCandidate = normalize(candidate); + const normalizedDir = normalize(dir); + if (normalizedCandidate.startsWith(normalizedDir + "/") || normalizedCandidate === normalizedDir) { + resolvedPath = candidate; + break; + } + } + if (!resolvedPath) { + sendResponse(undefined, "file outside shared directories"); + return; + } + + try { + const stat = statSync(resolvedPath); + if (!stat.isFile()) { + sendResponse(undefined, "not a file"); + return; + } + if (stat.size > BrokerClient.MAX_FILE_SIZE) { + sendResponse(undefined, `file too large (${stat.size} bytes, max ${BrokerClient.MAX_FILE_SIZE})`); + return; + } + const content = readFileSync(resolvedPath); + sendResponse(content.toString("base64")); + } catch (e) { + const errMsg = e instanceof Error ? e.message : String(e); + if (errMsg.includes("ENOENT")) { + sendResponse(undefined, "file not found"); + } else { + sendResponse(undefined, `read error: ${errMsg}`); + } + } + } + + /** Handle an inbound directory listing request from another peer. */ + private async handlePeerDirRequest(msg: { requesterPubkey: string; dirPath: string; pattern?: string; _reqId?: string }): Promise { + const { resolve, join, normalize, relative } = await import("node:path"); + const { readdirSync, statSync } = await import("node:fs"); + + const reqId = msg._reqId; + const sendResponse = (entries?: string[], error?: string) => { + if (!this.ws || this.ws.readyState !== this.ws.OPEN) return; + this.ws.send(JSON.stringify({ + type: "peer_dir_response", + requesterPubkey: msg.requesterPubkey, + dirPath: msg.dirPath, + ...(entries ? { entries } : {}), + ...(error ? { error } : {}), + ...(reqId ? { _reqId: reqId } : {}), + })); + }; + + const dirPath = msg.dirPath || "."; + + // Security: reject path traversal + if (dirPath.includes("..")) { + sendResponse(undefined, "path traversal not allowed"); + return; + } + + let resolvedPath: string | null = null; + for (const dir of this.sharedDirs) { + const candidate = resolve(join(dir, dirPath)); + const normalizedCandidate = normalize(candidate); + const normalizedDir = normalize(dir); + if (normalizedCandidate.startsWith(normalizedDir + "/") || normalizedCandidate === normalizedDir) { + resolvedPath = candidate; + break; + } + } + if (!resolvedPath) { + sendResponse(undefined, "directory outside shared directories"); + return; + } + + try { + const stat = statSync(resolvedPath); + if (!stat.isDirectory()) { + sendResponse(undefined, "not a directory"); + return; + } + + // Collect entries recursively (up to 2 levels, max 500 entries) + const entries: string[] = []; + const MAX_ENTRIES = 500; + const MAX_DEPTH = 2; + const pattern = msg.pattern ? new RegExp(msg.pattern.replace(/\*/g, ".*").replace(/\?/g, "."), "i") : null; + + const walk = (dir: string, depth: number) => { + if (entries.length >= MAX_ENTRIES || depth > MAX_DEPTH) return; + try { + const items = readdirSync(dir, { withFileTypes: true }); + for (const item of items) { + if (entries.length >= MAX_ENTRIES) break; + if (item.name.startsWith(".")) continue; // skip hidden + const relPath = relative(resolvedPath!, join(dir, item.name)); + const label = item.isDirectory() ? relPath + "/" : relPath; + if (pattern && !pattern.test(item.name)) { + // If directory, still recurse (pattern may match children) + if (item.isDirectory()) walk(join(dir, item.name), depth + 1); + continue; + } + entries.push(label); + if (item.isDirectory()) walk(join(dir, item.name), depth + 1); + } + } catch { /* permission errors, etc. */ } + }; + + walk(resolvedPath, 0); + sendResponse(entries.sort()); + } catch (e) { + const errMsg = e instanceof Error ? e.message : String(e); + if (errMsg.includes("ENOENT")) { + sendResponse(undefined, "directory not found"); + } else { + sendResponse(undefined, `read error: ${errMsg}`); + } + } + } + // --- Internals --- private resolveFromMap( @@ -1366,6 +1608,20 @@ export class BrokerClient { this.resolveFromMap(this.meshInfoResolvers, msgReqId, msg as Record); return; } + if (msg.type === "skill_ack") { + this.resolveFromMap(this.skillAckResolvers, msgReqId, { name: String(msg.name ?? ""), action: String(msg.action ?? "") }); + return; + } + if (msg.type === "skill_data") { + const skill = msg.skill as { name: string; description: string; instructions: string; tags: string[]; author: string; createdAt: string } | null; + this.resolveFromMap(this.skillDataResolvers, msgReqId, skill ?? null); + return; + } + if (msg.type === "skill_list") { + const skills = (msg.skills as Array<{ name: string; description: string; tags: string[]; author: string; createdAt: string }>) ?? []; + this.resolveFromMap(this.skillListResolvers, msgReqId, skills); + return; + } if (msg.type === "scheduled_ack") { this.resolveFromMap(this.scheduledAckResolvers, msgReqId, { scheduledId: String(msg.scheduledId ?? ""), @@ -1419,6 +1675,42 @@ export class BrokerClient { } return; } + // --- Peer file sharing handlers --- + if (msg.type === "peer_file_request_forward") { + void this.handlePeerFileRequest(msg as { requesterPubkey: string; filePath: string; _reqId?: string }); + return; + } + if (msg.type === "peer_file_response_forward") { + this.resolveFromMap(this.peerFileResponseResolvers, msgReqId, { + content: msg.content ? String(msg.content) : undefined, + error: msg.error ? String(msg.error) : undefined, + }); + return; + } + if (msg.type === "peer_dir_request_forward") { + void this.handlePeerDirRequest(msg as { requesterPubkey: string; dirPath: string; pattern?: string; _reqId?: string }); + return; + } + if (msg.type === "peer_dir_response_forward") { + this.resolveFromMap(this.peerDirResponseResolvers, msgReqId, { + entries: (msg.entries as string[] | undefined) ?? undefined, + error: msg.error ? String(msg.error) : undefined, + }); + return; + } + if (msg.type === "webhook_ack") { + this.resolveFromMap(this.webhookAckResolvers, msgReqId, { + name: String(msg.name ?? ""), + url: String(msg.url ?? ""), + secret: String(msg.secret ?? ""), + }); + return; + } + if (msg.type === "webhook_list") { + const webhooks = (msg.webhooks as Array<{ name: string; url: string; active: boolean; createdAt: string }>) ?? []; + this.resolveFromMap(this.webhookListResolvers, msgReqId, webhooks); + return; + } if (msg.type === "error") { this.debug(`broker error: ${msg.code} ${msg.message}`); const id = msg.id ? String(msg.id) : null; @@ -1469,6 +1761,11 @@ export class BrokerClient { [this.mcpRegisterResolvers, null], [this.mcpListResolvers, []], [this.mcpCallResolvers, { error: "broker error" }], + [this.skillAckResolvers, null], + [this.skillDataResolvers, null], + [this.skillListResolvers, []], + [this.peerFileResponseResolvers, { error: "broker error" }], + [this.peerDirResponseResolvers, { error: "broker error" }], ]; for (const [map, defaultVal] of allMaps) { const first = (map as Map).entries().next().value as [string, { resolve: (v: unknown) => void; timer: NodeJS.Timeout }] | undefined;