feat(broker): add claude-powered telegram bot with tool calling
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-04-13 20:40:16 +01:00
parent 4561076904
commit f4881b21b0
5 changed files with 616 additions and 36 deletions

View File

@@ -535,6 +535,25 @@ const pendingVerifications = new Map<
// Conversation state: chatId → which input the bot is waiting for
const conversationState = new Map<number, "awaiting_email" | "awaiting_code">();
/** Pending AI actions awaiting user confirmation */
const pendingAiActions = new Map<string, {
chatId: number;
meshIds: string[];
toolCall: { name: string; input: Record<string, unknown> };
expiresAt: number;
}>();
/** Chat → mesh slugs mapping for AI context */
const chatMeshSlugs = new Map<number, string[]>();
// Clean expired AI actions every 5 min
setInterval(() => {
const now = Date.now();
for (const [k, v] of pendingAiActions) {
if (now > v.expiresAt) pendingAiActions.delete(k);
}
}, 5 * 60 * 1000);
/** Invite URL regex: https://claudemesh.com/join/<token> */
const INVITE_URL_RE =
/https?:\/\/(?:www\.)?claudemesh\.com\/join\/([A-Za-z0-9_\-\.]+)/;
@@ -1144,6 +1163,49 @@ function setupBotCommands(
const chatId = ctx.chat?.id;
if (!chatId) { await ctx.answerCallbackQuery(); return; }
// --- AI action confirmation ---
if (data.startsWith("ai_")) {
const [action, actionId] = data.split(":");
if (!actionId) { await ctx.answerCallbackQuery(); return; }
const pending = pendingAiActions.get(actionId);
if (!pending || pending.chatId !== chatId) {
await ctx.answerCallbackQuery({ text: "Expired. Send your message again." });
return;
}
if (action === "ai_cancel") {
pendingAiActions.delete(actionId);
await ctx.answerCallbackQuery({ text: "Cancelled" });
await ctx.editMessageText("❌ Cancelled.");
return;
}
if (action === "ai_edit") {
pendingAiActions.delete(actionId);
await ctx.answerCallbackQuery({ text: "Type your edited message" });
await ctx.editMessageText("✏️ Type your message again with corrections.");
return;
}
if (action === "ai_confirm") {
pendingAiActions.delete(actionId);
await ctx.answerCallbackQuery({ text: "Executing..." });
try {
const { formatResult } = await import("./telegram-ai");
const result = await executeAiToolCall(pending.toolCall, pending.meshIds);
await ctx.editMessageText(
formatResult(pending.toolCall.name, result),
{ parse_mode: "MarkdownV2" },
);
} catch (err) {
await ctx.editMessageText(`❌ Failed: ${err instanceof Error ? err.message : String(err)}`);
}
return;
}
}
// --- File recipient picker ---
if (data.startsWith("file:")) {
const pending = pendingFiles.get(chatId);
@@ -1523,24 +1585,140 @@ function setupBotCommands(
return;
}
// --- No mention → broadcast to all connected meshes ---
let sent = 0;
for (const meshId of meshIds) {
const conn = meshConnections.get(meshId);
if (!conn?.isConnected()) continue;
const ok = await conn.sendMessage(
"*",
`[via Telegram] ${text}`,
"next",
);
if (ok) sent++;
}
if (sent === 0) {
await ctx.reply("❌ Not connected to any mesh.");
// --- No mention → process through Claude AI ---
try {
const { processMessage, formatConfirmation, formatResult, CONFIRM_ACTIONS } = await import("./telegram-ai");
// Gather context for the AI
const firstMeshId = meshIds[0]!;
const firstConn = meshConnections.get(firstMeshId);
const meshSlug = chatMeshSlugs.get(chatId)?.[0];
let recentPeers: string[] = [];
if (firstConn?.isConnected()) {
try {
const peers = await firstConn.listPeers();
recentPeers = peers.map(p => p.displayName);
} catch {}
}
const result = await processMessage(text, {
meshSlug,
userName: ctx.from?.first_name,
recentPeers,
});
if (result.type === "error") {
await ctx.reply(result.text ?? "Something went wrong.");
return;
}
if (result.type === "text") {
await ctx.reply(result.text ?? "");
return;
}
if (result.type === "tool_call" && result.toolCall) {
if (result.requiresConfirmation) {
// Store pending action and show confirmation buttons
const actionId = `ai_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
pendingAiActions.set(actionId, {
chatId,
meshIds,
toolCall: result.toolCall,
expiresAt: Date.now() + 5 * 60 * 1000,
});
const confirmText = formatConfirmation(result.toolCall);
await ctx.reply(confirmText, {
parse_mode: "MarkdownV2",
reply_markup: {
inline_keyboard: [
[
{ text: "✅ Confirm", callback_data: `ai_confirm:${actionId}` },
{ text: "✏️ Edit", callback_data: `ai_edit:${actionId}` },
{ text: "❌ Cancel", callback_data: `ai_cancel:${actionId}` },
],
],
},
});
} else {
// Read-only action — execute immediately
const execResult = await executeAiToolCall(result.toolCall, meshIds);
await ctx.reply(formatResult(result.toolCall.name, execResult), {
parse_mode: "MarkdownV2",
});
}
}
} catch (err) {
log.error("telegram-ai-handler", { error: err instanceof Error ? err.message : String(err) });
// Fallback: broadcast the text directly
let sent = 0;
for (const meshId of meshIds) {
const conn = meshConnections.get(meshId);
if (!conn?.isConnected()) continue;
const ok = await conn.sendMessage("*", `[via Telegram] ${text}`, "next");
if (ok) sent++;
}
if (sent === 0) await ctx.reply("❌ Not connected.");
}
});
}
// ---------------------------------------------------------------------------
// AI tool call executor
// ---------------------------------------------------------------------------
async function executeAiToolCall(
toolCall: { name: string; input: Record<string, unknown> },
meshIds: string[],
): Promise<unknown> {
const firstMeshId = meshIds[0];
if (!firstMeshId) throw new Error("No mesh connected");
const conn = meshConnections.get(firstMeshId);
if (!conn?.isConnected()) throw new Error("Not connected to mesh");
switch (toolCall.name) {
case "send_message": {
const to = String(toolCall.input.to ?? "*");
const message = String(toolCall.input.message ?? "");
const priority = String(toolCall.input.priority ?? "next");
// Resolve peer name → pubkey
let targetSpec = to;
if (!to.startsWith("@") && to !== "*" && !/^[0-9a-f]{64}$/.test(to)) {
const peers = await conn.listPeers();
const match = peers.find(p => p.displayName.toLowerCase() === to.toLowerCase());
if (!match) {
const partials = peers.filter(p => p.displayName.toLowerCase().includes(to.toLowerCase()));
if (partials.length === 1) targetSpec = partials[0]!.pubkey;
else throw new Error(`Peer "${to}" not found`);
} else {
targetSpec = match.pubkey;
}
}
const ok = await conn.sendMessage(targetSpec, `[via Telegram] ${message}`, priority as "now" | "next" | "low");
if (!ok) throw new Error("Send failed");
return { ok: true };
}
case "list_peers":
return conn.listPeers();
case "remember":
case "recall":
case "get_state":
case "set_state":
// These operations require WS request/response patterns not yet
// implemented in MeshConnection. Coming in a future update.
throw new Error(`${toolCall.name} not yet available via Telegram. Use the CLI.`);
default:
throw new Error(`Unknown tool: ${toolCall.name}`);
}
}
// ---------------------------------------------------------------------------
// Ensure a mesh WS connection exists (create or reuse)
// ---------------------------------------------------------------------------