/** * Telegram Bridge — Multi-Tenant Module * * Manages a single @claudemesh_bot instance with long-polling and a pool of * WebSocket connections (one per unique mesh). Multiple Telegram chats can * share the same mesh connection; push messages fan out to all chats. * * This file is self-contained. The broker's index.ts imports bootTelegramBridge * and connectChat, passing DB accessor callbacks so we never import db.ts. */ import { Bot, InputFile } from "grammy"; import WebSocket from "ws"; import sodium from "libsodium-wrappers"; import { validateTelegramConnectToken } from "./telegram-token"; // --------------------------------------------------------------------------- // Types // --------------------------------------------------------------------------- export interface BridgeRow { chatId: number; meshId: string; meshSlug?: string; memberId: string; pubkey: string; secretKey: string; displayName: string; chatType: string; chatTitle: string | null; } interface MeshCredentials { meshId: string; memberId: string; pubkey: string; secretKey: string; displayName: string; brokerUrl: string; } interface PeerInfo { displayName: string; pubkey: string; status: string; summary?: string; cwd?: string; groups?: string[]; avatar?: string; } // --------------------------------------------------------------------------- // Crypto helpers (mirrors apps/telegram/src/index.ts) // --------------------------------------------------------------------------- let sodiumReady = false; async function ensureSodium() { if (!sodiumReady) { await sodium.ready; sodiumReady = true; } return sodium; } async function generateSessionKeypair() { const s = await ensureSodium(); const kp = s.crypto_sign_keypair(); return { publicKey: s.to_hex(kp.publicKey), secretKey: s.to_hex(kp.privateKey), }; } async function signHello( meshId: string, memberId: string, pubkey: string, secretKeyHex: string, ) { const s = await ensureSodium(); const timestamp = Date.now(); const canonical = `${meshId}|${memberId}|${pubkey}|${timestamp}`; const sig = s.crypto_sign_detached( s.from_string(canonical), s.from_hex(secretKeyHex), ); return { timestamp, signature: s.to_hex(sig) }; } async function decryptDirect( nonce: string, ciphertext: string, senderPubkeyHex: string, recipientSecretKeyHex: string, ): Promise { const s = await ensureSodium(); try { const senderPub = s.crypto_sign_ed25519_pk_to_curve25519( s.from_hex(senderPubkeyHex), ); const recipientSec = s.crypto_sign_ed25519_sk_to_curve25519( s.from_hex(recipientSecretKeyHex), ); const nonceBytes = s.from_base64(nonce, s.base64_variants.ORIGINAL); const ciphertextBytes = s.from_base64( ciphertext, s.base64_variants.ORIGINAL, ); const plain = s.crypto_box_open_easy( ciphertextBytes, nonceBytes, senderPub, recipientSec, ); return s.to_string(plain); } catch { return null; } } async function encryptDirect( message: string, recipientPubkeyHex: string, senderSecretKeyHex: string, ): Promise<{ nonce: string; ciphertext: string }> { const s = await ensureSodium(); const recipientPub = s.crypto_sign_ed25519_pk_to_curve25519( s.from_hex(recipientPubkeyHex), ); const senderSec = s.crypto_sign_ed25519_sk_to_curve25519( s.from_hex(senderSecretKeyHex), ); const nonceBytes = s.randombytes_buf(s.crypto_box_NONCEBYTES); const ciphertextBytes = s.crypto_box_easy( s.from_string(message), nonceBytes, recipientPub, senderSec, ); return { nonce: s.to_base64(nonceBytes, s.base64_variants.ORIGINAL), ciphertext: s.to_base64(ciphertextBytes, s.base64_variants.ORIGINAL), }; } // --------------------------------------------------------------------------- // MeshConnection — one WS per unique mesh, shared across chats // --------------------------------------------------------------------------- class MeshConnection { private ws: WebSocket | null = null; private creds: MeshCredentials; private sessionPubkey: string | null = null; private sessionSecretKey: string | null = null; private connected = false; private reconnectTimer: ReturnType | null = null; private reconnectAttempt = 0; private resolvers = new Map< string, { resolve: (v: any) => void; timer: ReturnType } >(); /** pubkey/sessionPubkey → { name, avatar } */ private peerInfo = new Map(); private onPush: (meshId: string, from: string, text: string, priority: string) => void; private peerRefreshInterval: ReturnType | null = null; constructor( creds: MeshCredentials, onPush: (meshId: string, from: string, text: string, priority: string) => void, ) { this.creds = creds; this.onPush = onPush; } async connect(): Promise { const sessionKP = await generateSessionKeypair(); this.sessionPubkey = sessionKP.publicKey; this.sessionSecretKey = sessionKP.secretKey; await this._connect(); // Refresh peer name cache every 30 s this.peerRefreshInterval = setInterval( () => this.listPeers().catch(() => {}), 30_000, ); } private _connect(): Promise { return new Promise((resolve, reject) => { const ws = new WebSocket(this.creds.brokerUrl); this.ws = ws; ws.on("open", async () => { try { const { timestamp, signature } = await signHello( this.creds.meshId, this.creds.memberId, this.creds.pubkey, this.creds.secretKey, ); ws.send( JSON.stringify({ type: "hello", meshId: this.creds.meshId, memberId: this.creds.memberId, pubkey: this.creds.pubkey, sessionPubkey: this.sessionPubkey, displayName: this.creds.displayName, sessionId: `tg-bridge-${this.creds.meshId.slice(0, 8)}-${Date.now()}`, pid: process.pid, cwd: process.cwd(), hostname: require("os").hostname(), peerType: "bridge", channel: "telegram", timestamp, signature, }), ); } catch (e) { reject(e); } }); const helloTimeout = setTimeout(() => { ws.close(); reject(new Error("hello_ack timeout")); }, 10_000); ws.on("message", async (raw) => { try { const msg = JSON.parse(raw.toString()); if (msg.type === "hello_ack") { clearTimeout(helloTimeout); this.connected = true; this.reconnectAttempt = 0; console.log( `[tg-bridge] WS connected to mesh ${this.creds.meshId.slice(0, 8)} as ${this.creds.displayName}`, ); resolve(); return; } // Push messages from peers → forward to Telegram if (msg.type === "push") { let text: string | null = null; const senderPubkey = msg.senderPubkey ?? msg.senderSessionPubkey; if (msg.subtype === "system") { const event = msg.event ?? ""; const data = msg.eventData ?? {}; if (event === "peer_joined") text = `[joined] ${data.displayName ?? "peer"}`; else if (event === "peer_left") text = `[left] ${data.displayName ?? "peer"}`; else if (event === "peer_returned") text = `[returned] ${data.name ?? "peer"}`; else text = msg.plaintext ?? `[${event}]`; } else if (senderPubkey && msg.nonce && msg.ciphertext) { // Try session key, then member key text = (await decryptDirect( msg.nonce, msg.ciphertext, senderPubkey, this.sessionSecretKey!, )) ?? (await decryptDirect( msg.nonce, msg.ciphertext, senderPubkey, this.creds.secretKey, )); if (!text) text = "[could not decrypt]"; } else if (msg.plaintext) { text = msg.plaintext; } else if (msg.ciphertext && !msg.nonce) { try { text = Buffer.from(msg.ciphertext, "base64").toString("utf-8"); } catch { text = "[decode error]"; } } if (text) { const info = senderPubkey ? this.peerInfo.get(senderPubkey) : null; const fromName = info?.name ?? senderPubkey?.slice(0, 12) ?? "system"; const avatar = info?.avatar ?? "🤖"; this.onPush( this.creds.meshId, `${avatar} ${fromName}`, text, msg.priority ?? "next", ); } } // Resolve pending request/response pairs const reqId = msg._reqId; if (reqId && this.resolvers.has(reqId)) { const r = this.resolvers.get(reqId)!; clearTimeout(r.timer); this.resolvers.delete(reqId); r.resolve(msg); } } catch { /* ignore parse errors */ } }); ws.on("close", () => { this.connected = false; this.ws = null; if (this.reconnectTimer) return; const MAX_RECONNECT_ATTEMPTS = 20; if (this.reconnectAttempt >= MAX_RECONNECT_ATTEMPTS) { console.error( `[tg-bridge] mesh ${this.creds.meshId.slice(0, 8)} giving up after ${MAX_RECONNECT_ATTEMPTS} attempts`, ); meshConnections.delete(this.creds.meshId); return; } const delays = [1000, 2000, 4000, 8000, 16000, 30000]; const delay = delays[Math.min(this.reconnectAttempt, delays.length - 1)]!; this.reconnectAttempt++; console.log( `[tg-bridge] mesh ${this.creds.meshId.slice(0, 8)} reconnecting in ${delay}ms (attempt ${this.reconnectAttempt}/${MAX_RECONNECT_ATTEMPTS})`, ); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = null; this._connect().catch((e) => console.error("[tg-bridge] reconnect failed:", e), ); }, delay); }); ws.on("error", (err) => { console.error( `[tg-bridge] WS error mesh ${this.creds.meshId.slice(0, 8)}:`, err.message, ); }); }); } // -- Request / Response helpers -- private makeReqId(): string { return Math.random().toString(36).slice(2) + Date.now().toString(36); } private request( msg: Record, timeout = 10_000, ): Promise { return new Promise((resolve) => { const reqId = this.makeReqId(); const timer = setTimeout(() => { this.resolvers.delete(reqId); resolve(null); }, timeout); this.resolvers.set(reqId, { resolve, timer }); this.ws?.send(JSON.stringify({ ...msg, _reqId: reqId })); }); } // -- Public API -- async sendMessage( to: string, message: string, priority = "next", ): Promise { if (!this.ws || !this.connected) return false; const isDirect = /^[0-9a-f]{64}$/.test(to); let nonce = ""; let ciphertext = ""; if (isDirect) { const enc = await encryptDirect( message, to, this.sessionSecretKey!, ); nonce = enc.nonce; ciphertext = enc.ciphertext; } else { ciphertext = Buffer.from(message, "utf-8").toString("base64"); } this.ws.send( JSON.stringify({ type: "send", id: this.makeReqId(), targetSpec: to, priority, nonce, ciphertext, }), ); return true; } async listPeers(): Promise { const resp = await this.request({ type: "list_peers" }); if (!resp?.peers) return []; return resp.peers.map((p: any) => { const name = p.displayName ?? p.pubkey?.slice(0, 12) ?? "?"; const avatar = p.profile?.avatar; const info = { name, avatar }; if (p.pubkey) this.peerInfo.set(p.pubkey, info); if (p.sessionPubkey) this.peerInfo.set(p.sessionPubkey, info); return { displayName: name, pubkey: p.pubkey ?? "", status: p.status ?? "unknown", summary: p.summary, cwd: p.cwd, groups: p.groups?.map((g: any) => g.name) ?? [], avatar, }; }); } async findPeersByName(name: string): Promise { const peers = await this.listPeers(); return peers.filter( (p) => p.displayName.toLowerCase() === name.toLowerCase(), ); } async setSummary(summary: string): Promise { this.ws?.send(JSON.stringify({ type: "set_summary", summary })); } async uploadFile( data: Buffer, fileName: string, tags?: string[], ): Promise { const brokerHttp = this.creds.brokerUrl .replace("wss://", "https://") .replace("ws://", "http://") .replace("/ws", ""); try { const res = await fetch(`${brokerHttp}/upload`, { method: "POST", headers: { "Content-Type": "application/octet-stream", "X-Mesh-Id": this.creds.meshId, "X-Member-Id": this.creds.memberId, "X-File-Name": fileName, "X-Tags": JSON.stringify(tags ?? ["telegram"]), "X-Persistent": "true", }, body: data, signal: AbortSignal.timeout(30_000), }); const body = (await res.json()) as { ok?: boolean; fileId?: string; error?: string; }; if (!res.ok || !body.fileId) return null; return body.fileId; } catch (e) { console.error("[tg-bridge] upload failed:", e); return null; } } async getFileUrl( fileId: string, ): Promise<{ url: string; name: string } | null> { const resp = await this.request({ type: "get_file", fileId }); if (!resp?.url) return null; return { url: resp.url, name: resp.name ?? "file" }; } isConnected(): boolean { return this.connected; } getMeshId(): string { return this.creds.meshId; } close(): void { if (this.peerRefreshInterval) clearInterval(this.peerRefreshInterval); if (this.reconnectTimer) clearTimeout(this.reconnectTimer); this.ws?.close(); } } // --------------------------------------------------------------------------- // Routing maps // --------------------------------------------------------------------------- /** chatId → meshIds this chat is connected to */ const chatMeshes = new Map(); /** meshId → chatIds that should receive push messages */ const meshChats = new Map>(); /** meshId → shared WS connection */ const meshConnections = new Map(); /** meshId → slug (human-readable name) */ const meshSlugs = new Map(); // Pending DM picker state: chatId → { message, matches, meshId } const pendingDMs = new Map< number, { message: string; matches: PeerInfo[]; meshId: string } >(); // Pending file upload picker state: chatId → { fileId, fileName, meshId, caption } const pendingFiles = new Map< number, { fileId: string; fileName: string; meshId: string; caption: string } >(); // Pending email verification state: chatId → { email, code, expiresAt, attempts } const pendingVerifications = new Map< number, { email: string; code: string; expiresAt: number; attempts: number } >(); // Conversation state: chatId → which input the bot is waiting for const conversationState = new Map(); /** Pending AI actions awaiting user confirmation */ const pendingAiActions = new Map }; expiresAt: number; }>(); /** Chat → mesh slugs mapping for AI context */ const chatMeshSlugs = new Map(); // Clean expired AI actions every 5 min setInterval(() => { const now = Date.now(); for (const [k, v] of pendingAiActions) { if (now > v.expiresAt) pendingAiActions.delete(k); } }, 5 * 60 * 1000); /** Invite URL regex: https://claudemesh.com/join/ */ const INVITE_URL_RE = /https?:\/\/(?:www\.)?claudemesh\.com\/join\/([A-Za-z0-9_\-\.]+)/; // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** Add a chat ↔ mesh link to the in-memory maps. */ function linkChatMesh(chatId: number, meshId: string): void { const meshes = chatMeshes.get(chatId) ?? []; if (!meshes.includes(meshId)) { meshes.push(meshId); chatMeshes.set(chatId, meshes); } const chats = meshChats.get(meshId) ?? new Set(); chats.add(chatId); meshChats.set(meshId, chats); } /** Remove a chat ↔ mesh link from in-memory maps. */ function unlinkChatMesh(chatId: number, meshId: string): void { const meshes = chatMeshes.get(chatId); if (meshes) { const idx = meshes.indexOf(meshId); if (idx !== -1) meshes.splice(idx, 1); if (meshes.length === 0) chatMeshes.delete(chatId); } const chats = meshChats.get(meshId); if (chats) { chats.delete(chatId); if (chats.size === 0) meshChats.delete(meshId); } } /** * Resolve which MeshConnection a chat command should target. * If the chat is connected to exactly one mesh, return it. * If connected to multiple and a prefix is given (e.g. "dev-team"), * match by meshId prefix. Otherwise return null (caller should prompt). */ function resolveMesh( chatId: number, meshPrefix?: string, ): MeshConnection | null { const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) return null; if (meshIds.length === 1) { return meshConnections.get(meshIds[0]!) ?? null; } if (meshPrefix) { const lower = meshPrefix.toLowerCase(); const match = meshIds.find((id) => id.toLowerCase().startsWith(lower)); if (match) return meshConnections.get(match) ?? null; // Also try partial match anywhere in the id const partial = meshIds.find((id) => id.toLowerCase().includes(lower)); if (partial) return meshConnections.get(partial) ?? null; } return null; } /** * Parse an optional mesh prefix from command text. * Format: "meshSlug:rest" or "meshSlug rest" (for /peers etc.) * Returns [meshPrefix | undefined, remainingText]. */ function parseMeshPrefix( chatId: number, text: string, ): [string | undefined, string] { const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length <= 1) return [undefined, text]; // Try "slug:rest" format const colonIdx = text.indexOf(":"); if (colonIdx > 0 && colonIdx < 40) { return [text.slice(0, colonIdx), text.slice(colonIdx + 1).trimStart()]; } // Try "slug rest" — only if first word matches a known meshId const spaceIdx = text.indexOf(" "); const firstWord = spaceIdx === -1 ? text : text.slice(0, spaceIdx); const lower = firstWord.toLowerCase(); const isSlug = meshIds.some( (id) => id.toLowerCase().startsWith(lower) || id.toLowerCase().includes(lower), ); if (isSlug) { return [ firstWord, spaceIdx === -1 ? "" : text.slice(spaceIdx + 1).trimStart(), ]; } return [undefined, text]; } // --------------------------------------------------------------------------- // Push handler — fan out mesh push to Telegram chats // --------------------------------------------------------------------------- function createPushHandler(bot: Bot) { return ( meshId: string, from: string, text: string, _priority: string, ) => { const chatIds = meshChats.get(meshId); if (!chatIds || chatIds.size === 0) return; const meshLabel = meshSlugs.get(meshId) ?? meshId.slice(0, 12); const formatted = `💬 [${meshLabel}] ${from}\n${text}`; for (const chatId of chatIds) { bot.api .sendMessage(chatId, formatted) .catch((e) => { console.error(`[tg-bridge] send to chat ${chatId} failed:`, e.message); }); } }; } /** Escape Markdown v1 special chars for Telegram. */ function escapeMarkdown(s: string): string { return s.replace(/([_*\[\]()~`>#+\-=|{}.!\\])/g, "\\$1"); } /** Strip markdown formatting from AI text responses for plain Telegram display. */ function stripMarkdown(s: string): string { return s .replace(/\*\*(.*?)\*\*/g, "$1") // **bold** → bold .replace(/\*(.*?)\*/g, "$1") // *italic* → italic .replace(/__(.*?)__/g, "$1") // __underline__ → underline .replace(/~~(.*?)~~/g, "$1") // ~~strike~~ → strike .replace(/`(.*?)`/g, "$1") // `code` → code .replace(/\[([^\]]+)\]\([^)]+\)/g, "$1"); // [text](url) → text } // --------------------------------------------------------------------------- // Bot command handlers // --------------------------------------------------------------------------- // --------------------------------------------------------------------------- // Email verification helpers // --------------------------------------------------------------------------- function generateCode(): string { return Math.floor(100000 + Math.random() * 900000).toString(); } async function startEmailVerification( ctx: any, chatId: number, email: string, lookupMeshesByEmail: (email: string) => Promise, sendVerificationEmail: (email: string, code: string) => Promise, ): Promise { // Check if email exists in our system const meshes = await lookupMeshesByEmail(email); if (meshes.length === 0) { conversationState.delete(chatId); pendingVerifications.delete(chatId); await ctx.reply( "❌ No claudemesh account found for that email.\n\n" + "Sign up at claudemesh.com first, or use a connect link:\n" + "`claudemesh connect telegram`", { parse_mode: "Markdown" }, ); return; } const code = generateCode(); const sent = await sendVerificationEmail(email, code); if (!sent) { conversationState.delete(chatId); await ctx.reply("❌ Failed to send verification email. Try again later."); return; } pendingVerifications.set(chatId, { email, code, expiresAt: Date.now() + 10 * 60_000, // 10 min attempts: 0, }); conversationState.set(chatId, "awaiting_code"); const masked = email.replace(/(.{2})(.*)(@.*)/, "$1***$3"); await ctx.reply(`📬 Verification code sent to ${masked}\n\nEnter the 6-digit code:`); } /** Result from looking up a user's meshes by email */ export interface UserMeshInfo { userId: string; meshId: string; meshSlug: string; memberId: string; pubkey: string; secretKey: string; } function setupBotCommands( bot: Bot, botToken: string, brokerUrl: string, saveBridge: ( row: Omit & { chatId: number }, ) => Promise, deactivateBridge: (chatId: number, meshId: string) => Promise, pushHandler: ( meshId: string, from: string, text: string, priority: string, ) => void, lookupMeshesByEmail: (email: string) => Promise, sendVerificationEmail: (email: string, code: string) => Promise, ): void { // --- /start --- bot.command("start", async (ctx) => { const token = ctx.match?.trim(); if (!token) { await ctx.reply( "🔗 *Claudemesh Telegram Bridge*\n\n" + "Use a connect link from the dashboard or CLI to get started.\n" + "Or type /connect to link via email.\n\n" + "Commands: /help", { parse_mode: "Markdown" }, ); return; } // Validate JWT signature, expiry, and claims const encKey = process.env.BROKER_ENCRYPTION_KEY; if (!encKey) { await ctx.reply("❌ Broker not configured for token validation."); return; } const payload = validateTelegramConnectToken(token, encKey); if (!payload) { await ctx.reply("❌ Invalid, expired, or tampered token. Request a new link."); return; } const { meshId, memberId, pubkey, secretKey, meshSlug } = payload; const chatId = ctx.chat.id; const chatType = ctx.chat.type; const chatTitle = ctx.chat.type === "private" ? (ctx.from?.first_name ?? "Private") : ("title" in ctx.chat ? ctx.chat.title : null) ?? "Group"; const displayName = `tg:${chatTitle}`; // Check if already connected const existing = chatMeshes.get(chatId); if (existing?.includes(meshId)) { await ctx.reply(`Already connected to mesh \`${meshSlug ?? meshId.slice(0, 8)}\`.`, { parse_mode: "Markdown", }); return; } try { // Persist bridge row await saveBridge({ chatId, meshId, memberId, pubkey, secretKey, displayName, chatType, chatTitle, }); // Connect or reuse WS await ensureMeshConnection( { meshId, memberId, pubkey, secretKey, displayName, brokerUrl }, pushHandler, ); linkChatMesh(chatId, meshId); if (meshSlug) meshSlugs.set(meshId, meshSlug); await ctx.reply( `✅ Connected to mesh *${escapeMarkdown(meshSlug ?? meshId.slice(0, 8))}*\\!`, { parse_mode: "MarkdownV2" }, ); } catch (e) { console.error("[tg-bridge] /start connect failed:", e); await ctx.reply("❌ Connection failed. Try again or request a new token."); } }); // --- /connect (email verification flow) --- bot.command("connect", async (ctx) => { const chatId = ctx.chat.id; // If they passed an email inline: /connect user@example.com const emailArg = ctx.match?.trim(); if (emailArg && emailArg.includes("@")) { conversationState.set(chatId, "awaiting_code"); await startEmailVerification(ctx, chatId, emailArg, lookupMeshesByEmail, sendVerificationEmail); return; } conversationState.set(chatId, "awaiting_email"); await ctx.reply( "📧 *Connect via email*\n\nEnter the email address you used to sign up on claudemesh.com:", { parse_mode: "Markdown" }, ); }); // --- /disconnect --- bot.command("disconnect", async (ctx) => { const chatId = ctx.chat.id; const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) { await ctx.reply("Not connected to any mesh."); return; } const [meshPrefix, _] = parseMeshPrefix(chatId, ctx.match ?? ""); let targetMeshId: string | undefined; if (meshIds.length === 1) { targetMeshId = meshIds[0]!; } else if (meshPrefix) { const lower = meshPrefix.toLowerCase(); targetMeshId = meshIds.find( (id) => id.toLowerCase().startsWith(lower) || id.toLowerCase().includes(lower), ); } if (!targetMeshId && meshIds.length > 1) { const list = meshIds.map((id) => `• \`${id.slice(0, 12)}\``).join("\n"); await ctx.reply( `Connected to multiple meshes. Specify which:\n${list}\n\n/disconnect `, { parse_mode: "Markdown" }, ); return; } if (!targetMeshId) { await ctx.reply("Mesh not found."); return; } try { await deactivateBridge(chatId, targetMeshId); unlinkChatMesh(chatId, targetMeshId); // If no more chats reference this mesh, close the WS const remaining = meshChats.get(targetMeshId); if (!remaining || remaining.size === 0) { const conn = meshConnections.get(targetMeshId); if (conn) { conn.close(); meshConnections.delete(targetMeshId); } } await ctx.reply(`✅ Disconnected from mesh \`${targetMeshId.slice(0, 12)}\`.`, { parse_mode: "Markdown", }); } catch (e) { console.error("[tg-bridge] /disconnect failed:", e); await ctx.reply("❌ Disconnect failed."); } }); // --- /meshes --- bot.command("meshes", async (ctx) => { const meshIds = chatMeshes.get(ctx.chat.id); if (!meshIds || meshIds.length === 0) { await ctx.reply("Not connected to any mesh. Use a connect link to join."); return; } const lines = meshIds.map((id) => { const conn = meshConnections.get(id); const status = conn?.isConnected() ? "🟢" : "🔴"; return `${status} \`${meshSlugs.get(id) ?? id.slice(0, 12)}\``; }); await ctx.reply(`*Connected meshes:*\n${lines.join("\n")}`, { parse_mode: "Markdown", }); }); // --- /peers [mesh-slug] --- bot.command("peers", async (ctx) => { const chatId = ctx.chat.id; const [meshPrefix] = parseMeshPrefix(chatId, ctx.match ?? ""); const conn = resolveMesh(chatId, meshPrefix); if (!conn) { const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) { await ctx.reply("Not connected to any mesh."); } else { await ctx.reply( "Connected to multiple meshes. Specify which: /peers ", ); } return; } const peers = await conn.listPeers(); if (peers.length === 0) { await ctx.reply("No peers online."); return; } const lines = peers.map((p) => { const icon = p.status === "idle" ? "🟢" : p.status === "working" ? "🟡" : "🔴"; const summary = p.summary ? ` — _${escapeMarkdown(p.summary)}_` : ""; return `${icon} *${escapeMarkdown(p.displayName)}*${summary}`; }); await ctx.reply(lines.join("\n"), { parse_mode: "Markdown" }); }); // --- /dm [mesh:] --- bot.command("dm", async (ctx) => { const chatId = ctx.chat.id; const rawText = ctx.match ?? ""; if (!rawText.trim()) { await ctx.reply("Usage: /dm "); return; } const [meshPrefix, text] = parseMeshPrefix(chatId, rawText); const conn = resolveMesh(chatId, meshPrefix); if (!conn) { const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) { await ctx.reply("Not connected to any mesh."); } else { await ctx.reply( "Connected to multiple meshes. Prefix with mesh slug: /dm mesh-slug:Mou hello", ); } return; } const spaceIdx = text.indexOf(" "); if (spaceIdx === -1) { await ctx.reply("Usage: /dm "); return; } const target = text.slice(0, spaceIdx); const message = text.slice(spaceIdx + 1); const matches = await conn.findPeersByName(target); if (matches.length === 0) { await ctx.reply(`❌ No peer named "${target}" found.`); return; } if (matches.length === 1) { const ok = await conn.sendMessage( matches[0]!.pubkey, `[via Telegram] ${message}`, "now", ); await ctx.reply( ok ? `✅ → ${matches[0]!.avatar ?? "🤖"} ${matches[0]!.displayName}` : "❌ Not connected", ); return; } // Multiple matches — show inline keyboard picker pendingDMs.set(chatId, { message, matches, meshId: conn.getMeshId(), }); const buttons = matches.map((p, i) => { const dir = p.cwd?.split("/").pop() ?? "?"; const avatar = p.avatar ?? "🤖"; return [ { text: `${avatar} ${p.displayName} (${dir})`, callback_data: `dm:${i}` }, ]; }); buttons.push([{ text: "📨 Send to ALL", callback_data: "dm:all" }]); await ctx.reply(`Multiple "${target}" peers online. Pick one or all:`, { reply_markup: { inline_keyboard: buttons }, }); }); // --- /broadcast [mesh:] --- bot.command("broadcast", async (ctx) => { const chatId = ctx.chat.id; const [meshPrefix, message] = parseMeshPrefix(chatId, ctx.match ?? ""); if (!message.trim()) { await ctx.reply("Usage: /broadcast "); return; } const conn = resolveMesh(chatId, meshPrefix); if (!conn) { await ctx.reply("Not connected or specify mesh: /broadcast mesh-slug:hello"); return; } const ok = await conn.sendMessage("*", `[via Telegram] ${message}`, "now"); await ctx.reply(ok ? "✅ Broadcast sent" : "❌ Not connected"); }); // --- /group [mesh:]@name --- bot.command("group", async (ctx) => { const chatId = ctx.chat.id; const [meshPrefix, text] = parseMeshPrefix(chatId, ctx.match ?? ""); if (!text.trim()) { await ctx.reply("Usage: /group @group-name "); return; } const spaceIdx = text.indexOf(" "); if (spaceIdx === -1) { await ctx.reply("Usage: /group @group-name "); return; } const target = text.slice(0, spaceIdx); const message = text.slice(spaceIdx + 1); const conn = resolveMesh(chatId, meshPrefix); if (!conn) { await ctx.reply("Not connected or specify mesh."); return; } const ok = await conn.sendMessage(target, `[via Telegram] ${message}`, "now"); await ctx.reply(ok ? `✅ Sent to ${target}` : "❌ Not connected"); }); // --- /file --- bot.command("file", async (ctx) => { const chatId = ctx.chat.id; const fileId = ctx.match?.trim(); if (!fileId) { await ctx.reply("Usage: /file "); return; } // Try all connected meshes for this chat const meshIds = chatMeshes.get(chatId) ?? []; for (const meshId of meshIds) { const conn = meshConnections.get(meshId); if (!conn?.isConnected()) continue; const file = await conn.getFileUrl(fileId); if (!file) continue; try { const resp = await fetch(file.url, { signal: AbortSignal.timeout(30_000), }); if (!resp.ok) continue; const buf = Buffer.from(await resp.arrayBuffer()); await ctx.replyWithDocument(new InputFile(buf, file.name)); return; } catch { continue; } } await ctx.reply(`❌ File \`${fileId}\` not found in any connected mesh.`, { parse_mode: "Markdown", }); }); // --- /status --- bot.command("status", async (ctx) => { const chatId = ctx.chat.id; const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) { await ctx.reply("Not connected to any mesh."); return; } const lines = meshIds.map((id) => { const conn = meshConnections.get(id); const icon = conn?.isConnected() ? "🟢" : "🔴"; return `${icon} \`${meshSlugs.get(id) ?? id.slice(0, 12)}\``; }); await ctx.reply( `*Claudemesh Telegram Bridge*\n${lines.join("\n")}`, { parse_mode: "Markdown" }, ); }); // --- /help --- bot.command("help", async (ctx) => { await ctx.reply( "🔗 *Claudemesh Telegram Bridge*\n\n" + "*Commands:*\n" + "• /start — Connect via deep link\n" + "• /connect — Link via email (coming soon)\n" + "• /disconnect — Disconnect from mesh\n" + "• /meshes — List connected meshes\n" + "• /peers — List online peers\n" + "• /dm — DM a peer\n" + "• /broadcast — Message all peers\n" + "• /group @name — Message a group\n" + "• /file — Download a mesh file\n" + "• /status — Connection status\n\n" + "_Multi-mesh: prefix commands with mesh slug_\n" + "`/peers dev-team` or `/dm dev-team:Mou hello`", { parse_mode: "Markdown" }, ); }); // --- Callback query handler (DM picker + file picker) --- bot.on("callback_query:data", async (ctx) => { const data = ctx.callbackQuery.data; const chatId = ctx.chat?.id; if (!chatId) { await ctx.answerCallbackQuery(); return; } // --- AI action confirmation --- if (data.startsWith("ai_")) { const [action, actionId] = data.split(":"); if (!actionId) { await ctx.answerCallbackQuery(); return; } const pending = pendingAiActions.get(actionId); if (!pending || pending.chatId !== chatId) { await ctx.answerCallbackQuery({ text: "Expired. Send your message again." }); return; } if (action === "ai_cancel") { pendingAiActions.delete(actionId); await ctx.answerCallbackQuery({ text: "Cancelled" }); await ctx.editMessageText("❌ Cancelled."); return; } if (action === "ai_edit") { pendingAiActions.delete(actionId); await ctx.answerCallbackQuery({ text: "Type your edited message" }); await ctx.editMessageText("✏️ Type your message again with corrections."); return; } if (action === "ai_confirm") { pendingAiActions.delete(actionId); await ctx.answerCallbackQuery({ text: "Executing..." }); try { const { formatResult } = await import("./telegram-ai"); const result = await executeAiToolCall(pending.toolCall, pending.meshIds); await ctx.editMessageText( formatResult(pending.toolCall.name, result), { parse_mode: "HTML" }, ); } catch (err) { await ctx.editMessageText(`❌ Failed: ${err instanceof Error ? err.message : String(err)}`); } return; } } // --- File recipient picker --- if (data.startsWith("file:")) { const pending = pendingFiles.get(chatId); if (!pending) { await ctx.answerCallbackQuery({ text: "Session expired. Send the file again." }); return; } const conn = meshConnections.get(pending.meshId); if (!conn?.isConnected()) { pendingFiles.delete(chatId); await ctx.answerCallbackQuery({ text: "Not connected." }); return; } const target = data.slice(5); // after "file:" const emoji = pending.fileName.endsWith(".jpg") ? "📷" : "📎"; const captionSuffix = pending.caption ? ` — "${pending.caption}"` : ""; const fileMsg = `[via Telegram] ${emoji} ${pending.fileName}${captionSuffix} (file: ${pending.fileId})`; if (target === "none") { // Keep in mesh only — no message sent pendingFiles.delete(chatId); await ctx.answerCallbackQuery({ text: "Kept private" }); await ctx.editMessageText(`🔒 File stored in mesh. ID: \`${pending.fileId}\``, { parse_mode: "Markdown" }); return; } if (target === "*") { // Broadcast to everyone await conn.sendMessage("*", fileMsg, "next"); pendingFiles.delete(chatId); await ctx.answerCallbackQuery({ text: "Sent to everyone" }); await ctx.editMessageText(`📢 ${emoji} Shared with everyone: \`${pending.fileName}\``, { parse_mode: "Markdown" }); return; } // Send to specific peer (target is pubkey prefix) const peers = await conn.listPeers(); const peer = peers.find(p => p.pubkey.startsWith(target)); if (!peer) { await ctx.answerCallbackQuery({ text: "Peer not found" }); return; } await conn.sendMessage(peer.pubkey, fileMsg, "now"); pendingFiles.delete(chatId); await ctx.answerCallbackQuery({ text: `Sent to ${peer.displayName}` }); await ctx.editMessageText( `${emoji} Sent to ${peer.avatar ?? "🤖"} *${escapeMarkdown(peer.displayName)}*: \`${escapeMarkdown(pending.fileName)}\``, { parse_mode: "Markdown" }, ); return; } // --- DM peer picker --- if (!data.startsWith("dm:")) { await ctx.answerCallbackQuery(); return; } const pending = pendingDMs.get(chatId); if (!pending) { await ctx.answerCallbackQuery({ text: "Session expired. Send /dm again." }); return; } const conn = meshConnections.get(pending.meshId); if (!conn?.isConnected()) { pendingDMs.delete(chatId); await ctx.answerCallbackQuery({ text: "Not connected." }); return; } if (data === "dm:all") { let sent = 0; for (const p of pending.matches) { const ok = await conn.sendMessage( p.pubkey, `[via Telegram] ${pending.message}`, "now", ); if (ok) sent++; } pendingDMs.delete(chatId); await ctx.answerCallbackQuery({ text: `Sent to ${sent} peers` }); await ctx.editMessageText( `✅ Sent to all ${sent} ${pending.matches[0]?.displayName ?? "?"} peers`, ); return; } const idx = parseInt(data.slice(3)); const peer = pending.matches[idx]; if (!peer) { await ctx.answerCallbackQuery({ text: "Invalid selection" }); return; } const ok = await conn.sendMessage( peer.pubkey, `[via Telegram] ${pending.message}`, "now", ); pendingDMs.delete(chatId); const dir = peer.cwd?.split("/").pop() ?? "?"; await ctx.answerCallbackQuery({ text: ok ? "Sent!" : "Failed" }); await ctx.editMessageText( ok ? `✅ → ${peer.avatar ?? "🤖"} ${peer.displayName} (${dir})` : "❌ Not connected", ); }); // --- Photo/Document upload → upload to mesh, then show recipient picker --- async function handleFileUpload( ctx: any, tgFileId: string, fileName: string, isPhoto: boolean, ): Promise { const chatId = ctx.chat.id; const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) return; const caption = ctx.message?.caption ?? ""; const emoji = isPhoto ? "📷" : "📎"; try { const file = await ctx.api.getFile(tgFileId); const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`; const resp = await fetch(url, { signal: AbortSignal.timeout(30_000) }); const buf = Buffer.from(await resp.arrayBuffer()); // Upload to first connected mesh const meshId = meshIds[0]!; const conn = meshConnections.get(meshId); if (!conn?.isConnected()) { await ctx.reply("❌ Not connected to mesh."); return; } const meshFileId = await conn.uploadFile(buf, fileName, [ "telegram", isPhoto ? "photo" : "document", ]); if (!meshFileId) { await ctx.reply("❌ Upload failed."); return; } // Store pending file and show recipient picker pendingFiles.set(chatId, { fileId: meshFileId, fileName, meshId, caption }); const peers = await conn.listPeers(); // Filter out the bridge itself const targets = peers.filter(p => !p.displayName.startsWith("tg:")); if (targets.length === 0) { // No peers online — broadcast anyway await conn.sendMessage("*", `[via Telegram] ${emoji} ${fileName}${caption ? ` — "${caption}"` : ""} (file: ${meshFileId})`, "next"); pendingFiles.delete(chatId); await ctx.reply(`${emoji} Uploaded and broadcast (no peers online).`); return; } // Build inline keyboard: top peers + Everyone + Keep private const buttons: { text: string; callback_data: string }[][] = []; const shown = targets.slice(0, 6); // Cap at 6 to avoid huge keyboard for (const p of shown) { buttons.push([{ text: `${p.avatar ?? "🤖"} ${p.displayName}`, callback_data: `file:${p.pubkey.slice(0, 16)}`, }]); } buttons.push([{ text: "📢 Everyone", callback_data: "file:*" }]); buttons.push([{ text: "🔒 Keep in mesh only", callback_data: "file:none" }]); await ctx.reply(`${emoji} *Uploaded:* \`${escapeMarkdown(fileName)}\`\nSend to:`, { parse_mode: "Markdown", reply_markup: { inline_keyboard: buttons }, }); } catch (e) { await ctx.reply(`❌ ${e instanceof Error ? e.message : String(e)}`); } } bot.on("message:photo", async (ctx) => { const photo = ctx.message.photo.at(-1); if (!photo) return; await handleFileUpload(ctx, photo.file_id, `telegram-photo-${Date.now()}.jpg`, true); }); bot.on("message:document", async (ctx) => { const doc = ctx.message.document; if (!doc) return; await handleFileUpload(ctx, doc.file_id, doc.file_name ?? `telegram-file-${Date.now()}`, false); }); // --- Default text handler: conversation state, invite URLs, @mentions, broadcast --- bot.on("message:text", async (ctx) => { const chatId = ctx.chat.id; const text = ctx.message.text; if (text.startsWith("/")) return; // Skip unknown commands // --- Email verification conversation state --- const state = conversationState.get(chatId); if (state === "awaiting_email") { const email = text.trim().toLowerCase(); if (!email.includes("@") || !email.includes(".")) { await ctx.reply("That doesn't look like an email. Try again:"); return; } await startEmailVerification(ctx, chatId, email, lookupMeshesByEmail, sendVerificationEmail); return; } if (state === "awaiting_code") { const pending = pendingVerifications.get(chatId); if (!pending) { conversationState.delete(chatId); await ctx.reply("Session expired. Type /connect to start again."); return; } // Check expiry if (Date.now() > pending.expiresAt) { pendingVerifications.delete(chatId); conversationState.delete(chatId); await ctx.reply("⏰ Code expired. Type /connect to get a new one."); return; } const inputCode = text.trim().replace(/\s/g, ""); // Check attempts pending.attempts++; if (pending.attempts > 5) { pendingVerifications.delete(chatId); conversationState.delete(chatId); await ctx.reply("❌ Too many attempts. Type /connect to start again."); return; } if (inputCode !== pending.code) { await ctx.reply(`❌ Wrong code. ${5 - pending.attempts} attempts left.`); return; } // Code correct — connect to all meshes for this email pendingVerifications.delete(chatId); conversationState.delete(chatId); const meshes = await lookupMeshesByEmail(pending.email); if (meshes.length === 0) { await ctx.reply("❌ No meshes found. The account may have been removed."); return; } const chatType = ctx.chat.type; const chatTitle = ctx.chat.type === "private" ? (ctx.from?.first_name ?? "Private") : ("title" in ctx.chat ? ctx.chat.title : null) ?? "Group"; const displayName = `tg:${chatTitle}`; let connected = 0; for (const m of meshes) { try { // Skip if already connected const existing = chatMeshes.get(chatId); if (existing?.includes(m.meshId)) { connected++; continue; } await saveBridge({ chatId, meshId: m.meshId, memberId: m.memberId, pubkey: m.pubkey, secretKey: m.secretKey, displayName, chatType, chatTitle, }); await ensureMeshConnection( { meshId: m.meshId, memberId: m.memberId, pubkey: m.pubkey, secretKey: m.secretKey, displayName, brokerUrl }, pushHandler, ); linkChatMesh(chatId, m.meshId); connected++; } catch (e) { console.error(`[tg-bridge] /connect failed for mesh ${m.meshId.slice(0, 8)}:`, e); } } if (connected === 0) { await ctx.reply("❌ Connection failed for all meshes."); } else if (meshes.length === 1) { await ctx.reply( `✅ Connected to mesh *${escapeMarkdown(meshes[0]!.meshSlug)}*\\!`, { parse_mode: "MarkdownV2" }, ); } else { const names = meshes.map(m => m.meshSlug).join(", "); await ctx.reply(`✅ Connected to ${connected} mesh(es): ${names}`); } return; } // --- Invite URL detection --- const inviteMatch = text.match(INVITE_URL_RE); if (inviteMatch) { const inviteToken = inviteMatch[1]!; await ctx.reply( `🔗 Detected invite link.\n\nTo connect, use the deep link from the dashboard or CLI.\nInvite token: \`${inviteToken}\``, { parse_mode: "Markdown" }, ); return; } const meshIds = chatMeshes.get(chatId); if (!meshIds || meshIds.length === 0) { // Not connected — ignore non-command messages return; } // --- @Mention pattern: "@PeerName message" --- const mentionMatch = text.match(/^@(\S+)\s+([\s\S]+)$/); if (mentionMatch) { const target = mentionMatch[1]!; const message = mentionMatch[2]!; // For multi-mesh, try all connections for (const meshId of meshIds) { const conn = meshConnections.get(meshId); if (!conn?.isConnected()) continue; const matches = await conn.findPeersByName(target); if (matches.length === 0) continue; if (matches.length === 1) { const ok = await conn.sendMessage( matches[0]!.pubkey, `[via Telegram] ${message}`, "now", ); await ctx.reply( ok ? `✅ → ${matches[0]!.avatar ?? "🤖"} ${matches[0]!.displayName}` : "❌ Not connected", ); return; } // Multiple matches — picker pendingDMs.set(chatId, { message, matches, meshId }); const buttons = matches.map((p, i) => { const dir = p.cwd?.split("/").pop() ?? "?"; return [ { text: `${p.avatar ?? "🤖"} ${p.displayName} (${dir})`, callback_data: `dm:${i}`, }, ]; }); buttons.push([ { text: "📨 Send to ALL", callback_data: "dm:all" }, ]); await ctx.reply( `Multiple "${target}" peers. Pick one or all:`, { reply_markup: { inline_keyboard: buttons } }, ); return; } await ctx.reply(`❌ No peer named "${target}" in any connected mesh.`); return; } // --- No mention → process through Claude AI --- try { const { processMessage, formatConfirmation, formatResult, CONFIRM_ACTIONS } = await import("./telegram-ai"); // Gather context for the AI const firstMeshId = meshIds[0]!; const firstConn = meshConnections.get(firstMeshId); const allMeshSlugs = meshIds.map(id => meshSlugs.get(id) ?? id.slice(0, 12)); let recentPeers: string[] = []; if (firstConn?.isConnected()) { try { const peers = await firstConn.listPeers(); recentPeers = peers.map(p => p.displayName); } catch {} } const result = await processMessage(text, { meshSlug: allMeshSlugs[0], meshSlugs: allMeshSlugs, userName: ctx.from?.first_name, recentPeers, }); if (result.type === "error") { await ctx.reply(stripMarkdown(result.text ?? "Something went wrong.")); return; } if (result.type === "text") { await ctx.reply(stripMarkdown(result.text ?? "")); return; } if (result.type === "tool_call" && result.toolCall) { if (result.requiresConfirmation) { // Store pending action and show confirmation buttons const actionId = `ai_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`; pendingAiActions.set(actionId, { chatId, meshIds, toolCall: result.toolCall, expiresAt: Date.now() + 5 * 60 * 1000, }); const confirmText = formatConfirmation(result.toolCall); await ctx.reply(confirmText, { parse_mode: "HTML", reply_markup: { inline_keyboard: [ [ { text: "✅ Confirm", callback_data: `ai_confirm:${actionId}` }, { text: "✏️ Edit", callback_data: `ai_edit:${actionId}` }, { text: "❌ Cancel", callback_data: `ai_cancel:${actionId}` }, ], ], }, }); } else { // Read-only action — execute immediately const execResult = await executeAiToolCall(result.toolCall, meshIds); await ctx.reply(formatResult(result.toolCall.name, execResult), { parse_mode: "HTML", }); } } } catch (err) { log.error("telegram-ai-handler", { error: err instanceof Error ? err.message : String(err) }); // Fallback: broadcast the text directly let sent = 0; for (const meshId of meshIds) { const conn = meshConnections.get(meshId); if (!conn?.isConnected()) continue; const ok = await conn.sendMessage("*", `[via Telegram] ${text}`, "next"); if (ok) sent++; } if (sent === 0) await ctx.reply("❌ Not connected."); } }); } // --------------------------------------------------------------------------- // AI tool call executor // --------------------------------------------------------------------------- async function executeAiToolCall( toolCall: { name: string; input: Record }, meshIds: string[], ): Promise { const firstMeshId = meshIds[0]; if (!firstMeshId) throw new Error("No mesh connected"); const conn = meshConnections.get(firstMeshId); if (!conn?.isConnected()) throw new Error("Not connected to mesh"); switch (toolCall.name) { case "send_message": { const to = String(toolCall.input.to ?? "*"); const message = String(toolCall.input.message ?? ""); const priority = String(toolCall.input.priority ?? "next"); // Resolve peer name → pubkey let targetSpec = to; if (!to.startsWith("@") && to !== "*" && !/^[0-9a-f]{64}$/.test(to)) { const peers = await conn.listPeers(); const match = peers.find(p => p.displayName.toLowerCase() === to.toLowerCase()); if (!match) { const partials = peers.filter(p => p.displayName.toLowerCase().includes(to.toLowerCase())); if (partials.length === 1) targetSpec = partials[0]!.pubkey; else throw new Error(`Peer "${to}" not found`); } else { targetSpec = match.pubkey; } } const ok = await conn.sendMessage(targetSpec, `[via Telegram] ${message}`, priority as "now" | "next" | "low"); if (!ok) throw new Error("Send failed"); return { ok: true }; } case "list_peers": return conn.listPeers(); case "list_meshes": { const results: Array<{ slug: string; peers: number }> = []; for (const meshId of meshIds) { const conn = meshConnections.get(meshId); const slug = meshSlugs.get(meshId) ?? meshId.slice(0, 12); let peerCount = 0; if (conn?.isConnected()) { try { const peers = await conn.listPeers(); peerCount = peers.length; } catch {} } results.push({ slug, peers: peerCount }); } return results; } case "remember": case "recall": case "get_state": case "set_state": throw new Error(`${toolCall.name} not yet available via Telegram. Use the CLI.`); default: throw new Error(`Unknown tool: ${toolCall.name}`); } } // --------------------------------------------------------------------------- // Ensure a mesh WS connection exists (create or reuse) // --------------------------------------------------------------------------- async function ensureMeshConnection( creds: MeshCredentials, pushHandler: ( meshId: string, from: string, text: string, priority: string, ) => void, ): Promise { const existing = meshConnections.get(creds.meshId); if (existing?.isConnected()) return existing; // Close stale connection if any if (existing) { existing.close(); meshConnections.delete(creds.meshId); } const conn = new MeshConnection(creds, pushHandler); meshConnections.set(creds.meshId, conn); await conn.connect(); await conn.setSummary( "Telegram bridge — relays messages between Telegram chats and mesh peers", ); return conn; } // --------------------------------------------------------------------------- // Boot — called by broker on startup // --------------------------------------------------------------------------- export async function bootTelegramBridge( loadActiveBridges: () => Promise, saveBridge: ( row: Omit & { chatId: number }, ) => Promise, deactivateBridge: (chatId: number, meshId: string) => Promise, botToken: string, brokerUrl: string, lookupMeshesByEmail?: (email: string) => Promise, sendVerificationEmail?: (email: string, code: string) => Promise, ): Promise { await ensureSodium(); const bot = new Bot(botToken); const pushHandler = createPushHandler(bot); // Load all active bridges from DB const rows = await loadActiveBridges(); console.log(`[tg-bridge] loaded ${rows.length} active bridge(s) from DB`); // Group by meshId to connect WS pool const byMesh = new Map(); for (const row of rows) { const arr = byMesh.get(row.meshId) ?? []; arr.push(row); byMesh.set(row.meshId, arr); } // Connect one WS per unique mesh for (const [meshId, meshRows] of byMesh) { const first = meshRows[0]!; try { await ensureMeshConnection( { meshId, memberId: first.memberId, pubkey: first.pubkey, secretKey: first.secretKey, displayName: first.displayName, brokerUrl, }, pushHandler, ); console.log( `[tg-bridge] connected WS for mesh ${meshId.slice(0, 8)} (${meshRows.length} chat(s))`, ); } catch (e) { console.error( `[tg-bridge] failed to connect mesh ${meshId.slice(0, 8)}:`, e, ); } // Populate routing maps and slug cache for all chats in this mesh for (const row of meshRows) { linkChatMesh(row.chatId, meshId); if (row.meshSlug) meshSlugs.set(meshId, row.meshSlug); } } // Grammy global error handler — prevents unhandled rejections from crashing broker bot.catch((err) => { console.error("[tg-bridge] Grammy error:", err.message ?? err); }); // Expire stale pendingDMs entries every 5 minutes (prevent memory leak) setInterval(() => { // pendingDMs/pendingFiles have no timestamp, so we cap size — clear all if > 1000 if (pendingDMs.size > 1000) { console.warn(`[tg-bridge] clearing ${pendingDMs.size} stale pendingDMs`); pendingDMs.clear(); } if (pendingFiles.size > 1000) { console.warn(`[tg-bridge] clearing ${pendingFiles.size} stale pendingFiles`); pendingFiles.clear(); } }, 5 * 60_000).unref(); // Default stubs if email callbacks not provided const emailLookup = lookupMeshesByEmail ?? (async () => []); const emailSend = sendVerificationEmail ?? (async () => false); // Wire up bot commands setupBotCommands( bot, botToken, brokerUrl, saveBridge, deactivateBridge, pushHandler, emailLookup, emailSend, ); // Start Grammy long-polling (fire-and-forget, must not crash broker) console.log("[tg-bridge] starting bot..."); bot.start({ onStart: () => console.log( `[tg-bridge] bot running — ${meshConnections.size} mesh(es), ${chatMeshes.size} chat(s)`, ), }).catch((err: unknown) => { console.error("[tg-bridge] bot.start() error:", err instanceof Error ? err.message : String(err)); }); } // --------------------------------------------------------------------------- // Connect a new chat at runtime (called from broker HTTP endpoints) // --------------------------------------------------------------------------- export async function connectChat( chatId: number, chatType: string, chatTitle: string | null, meshCreds: MeshCredentials, pushHandler?: ( meshId: string, from: string, text: string, priority: string, ) => void, ): Promise { // Default push handler is a no-op if bot isn't running yet // (the real one is wired during bootTelegramBridge) const handler = pushHandler ?? (() => {}); await ensureMeshConnection(meshCreds, handler); linkChatMesh(chatId, meshCreds.meshId); console.log( `[tg-bridge] chat ${chatId} (${chatType}) connected to mesh ${meshCreds.meshId.slice(0, 8)}`, ); }