From 126bbfeb2c0b7d5367833e4b1409eb3276647d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Thu, 9 Apr 2026 10:03:11 +0100 Subject: [PATCH] feat(broker+cli): multi-tenant telegram bridge with 4 entry points - DB: mesh.telegram_bridge table + migration - Broker: telegram-bridge.ts (Grammy bot + WS pool + routing) - Broker: telegram-token.ts (JWT connect tokens) - Broker: POST /tg/token endpoint + bridge boot on startup - CLI: claudemesh connect/disconnect telegram commands - Spec: docs/telegram-bridge-spec.md Co-Authored-By: Claude Opus 4.6 (1M context) --- apps/broker/src/telegram-bridge.ts | 1443 +++++++++++++++++ apps/broker/src/telegram-token.ts | 143 ++ apps/cli/src/commands/connect-telegram.ts | 65 + apps/cli/src/commands/disconnect-telegram.ts | 3 + apps/cli/src/index.ts | 22 + apps/telegram/Dockerfile | 15 + apps/telegram/package.json | 5 +- apps/telegram/src/index.ts | 15 +- docs/telegram-bridge-spec.md | 347 ++++ .../db/migrations/0016_telegram-bridge.sql | 16 + packages/db/src/schema/mesh.ts | 49 + 11 files changed, 2120 insertions(+), 3 deletions(-) create mode 100644 apps/broker/src/telegram-bridge.ts create mode 100644 apps/broker/src/telegram-token.ts create mode 100644 apps/cli/src/commands/connect-telegram.ts create mode 100644 apps/cli/src/commands/disconnect-telegram.ts create mode 100644 apps/telegram/Dockerfile create mode 100644 docs/telegram-bridge-spec.md create mode 100644 packages/db/migrations/0016_telegram-bridge.sql diff --git a/apps/broker/src/telegram-bridge.ts b/apps/broker/src/telegram-bridge.ts new file mode 100644 index 0000000..07247c4 --- /dev/null +++ b/apps/broker/src/telegram-bridge.ts @@ -0,0 +1,1443 @@ +/** + * Telegram Bridge β€” Multi-Tenant Module + * + * Manages a single @claudemesh_bot instance with long-polling and a pool of + * WebSocket connections (one per unique mesh). Multiple Telegram chats can + * share the same mesh connection; push messages fan out to all chats. + * + * This file is self-contained. The broker's index.ts imports bootTelegramBridge + * and connectChat, passing DB accessor callbacks so we never import db.ts. + */ + +import { Bot, InputFile } from "grammy"; +import WebSocket from "ws"; +import sodium from "libsodium-wrappers"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface BridgeRow { + chatId: number; + meshId: string; + memberId: string; + pubkey: string; + secretKey: string; + displayName: string; + chatType: string; + chatTitle: string | null; +} + +interface MeshCredentials { + meshId: string; + memberId: string; + pubkey: string; + secretKey: string; + displayName: string; + brokerUrl: string; +} + +interface PeerInfo { + displayName: string; + pubkey: string; + status: string; + summary?: string; + cwd?: string; + groups?: string[]; + avatar?: string; +} + +// --------------------------------------------------------------------------- +// Crypto helpers (mirrors apps/telegram/src/index.ts) +// --------------------------------------------------------------------------- + +let sodiumReady = false; + +async function ensureSodium() { + if (!sodiumReady) { + await sodium.ready; + sodiumReady = true; + } + return sodium; +} + +async function generateSessionKeypair() { + const s = await ensureSodium(); + const kp = s.crypto_sign_keypair(); + return { + publicKey: s.to_hex(kp.publicKey), + secretKey: s.to_hex(kp.privateKey), + }; +} + +async function signHello( + meshId: string, + memberId: string, + pubkey: string, + secretKeyHex: string, +) { + const s = await ensureSodium(); + const timestamp = Date.now(); + const canonical = `${meshId}|${memberId}|${pubkey}|${timestamp}`; + const sig = s.crypto_sign_detached( + s.from_string(canonical), + s.from_hex(secretKeyHex), + ); + return { timestamp, signature: s.to_hex(sig) }; +} + +async function decryptDirect( + nonce: string, + ciphertext: string, + senderPubkeyHex: string, + recipientSecretKeyHex: string, +): Promise { + const s = await ensureSodium(); + try { + const senderPub = s.crypto_sign_ed25519_pk_to_curve25519( + s.from_hex(senderPubkeyHex), + ); + const recipientSec = s.crypto_sign_ed25519_sk_to_curve25519( + s.from_hex(recipientSecretKeyHex), + ); + const nonceBytes = s.from_base64(nonce, s.base64_variants.ORIGINAL); + const ciphertextBytes = s.from_base64( + ciphertext, + s.base64_variants.ORIGINAL, + ); + const plain = s.crypto_box_open_easy( + ciphertextBytes, + nonceBytes, + senderPub, + recipientSec, + ); + return s.to_string(plain); + } catch { + return null; + } +} + +async function encryptDirect( + message: string, + recipientPubkeyHex: string, + senderSecretKeyHex: string, +): Promise<{ nonce: string; ciphertext: string }> { + const s = await ensureSodium(); + const recipientPub = s.crypto_sign_ed25519_pk_to_curve25519( + s.from_hex(recipientPubkeyHex), + ); + const senderSec = s.crypto_sign_ed25519_sk_to_curve25519( + s.from_hex(senderSecretKeyHex), + ); + const nonceBytes = s.randombytes_buf(s.crypto_box_NONCEBYTES); + const ciphertextBytes = s.crypto_box_easy( + s.from_string(message), + nonceBytes, + recipientPub, + senderSec, + ); + return { + nonce: s.to_base64(nonceBytes, s.base64_variants.ORIGINAL), + ciphertext: s.to_base64(ciphertextBytes, s.base64_variants.ORIGINAL), + }; +} + +// --------------------------------------------------------------------------- +// MeshConnection β€” one WS per unique mesh, shared across chats +// --------------------------------------------------------------------------- + +class MeshConnection { + private ws: WebSocket | null = null; + private creds: MeshCredentials; + private sessionPubkey: string | null = null; + private sessionSecretKey: string | null = null; + private connected = false; + private reconnectTimer: ReturnType | null = null; + private reconnectAttempt = 0; + private resolvers = new Map< + string, + { resolve: (v: any) => void; timer: ReturnType } + >(); + /** pubkey/sessionPubkey β†’ { name, avatar } */ + private peerInfo = new Map(); + private onPush: (meshId: string, from: string, text: string, priority: string) => void; + private peerRefreshInterval: ReturnType | null = null; + + constructor( + creds: MeshCredentials, + onPush: (meshId: string, from: string, text: string, priority: string) => void, + ) { + this.creds = creds; + this.onPush = onPush; + } + + async connect(): Promise { + const sessionKP = await generateSessionKeypair(); + this.sessionPubkey = sessionKP.publicKey; + this.sessionSecretKey = sessionKP.secretKey; + await this._connect(); + // Refresh peer name cache every 30 s + this.peerRefreshInterval = setInterval( + () => this.listPeers().catch(() => {}), + 30_000, + ); + } + + private _connect(): Promise { + return new Promise((resolve, reject) => { + const ws = new WebSocket(this.creds.brokerUrl); + this.ws = ws; + + ws.on("open", async () => { + try { + const { timestamp, signature } = await signHello( + this.creds.meshId, + this.creds.memberId, + this.creds.pubkey, + this.creds.secretKey, + ); + ws.send( + JSON.stringify({ + type: "hello", + meshId: this.creds.meshId, + memberId: this.creds.memberId, + pubkey: this.creds.pubkey, + sessionPubkey: this.sessionPubkey, + displayName: this.creds.displayName, + sessionId: `tg-bridge-${this.creds.meshId.slice(0, 8)}-${Date.now()}`, + pid: process.pid, + cwd: process.cwd(), + hostname: require("os").hostname(), + peerType: "bridge", + channel: "telegram", + timestamp, + signature, + }), + ); + } catch (e) { + reject(e); + } + }); + + const helloTimeout = setTimeout(() => { + ws.close(); + reject(new Error("hello_ack timeout")); + }, 10_000); + + ws.on("message", async (raw) => { + try { + const msg = JSON.parse(raw.toString()); + + if (msg.type === "hello_ack") { + clearTimeout(helloTimeout); + this.connected = true; + this.reconnectAttempt = 0; + console.log( + `[tg-bridge] WS connected to mesh ${this.creds.meshId.slice(0, 8)} as ${this.creds.displayName}`, + ); + resolve(); + return; + } + + // Push messages from peers β†’ forward to Telegram + if (msg.type === "push") { + let text: string | null = null; + const senderPubkey = + msg.senderPubkey ?? msg.senderSessionPubkey; + + if (msg.subtype === "system") { + const event = msg.event ?? ""; + const data = msg.eventData ?? {}; + if (event === "peer_joined") + text = `[joined] ${data.displayName ?? "peer"}`; + else if (event === "peer_left") + text = `[left] ${data.displayName ?? "peer"}`; + else if (event === "peer_returned") + text = `[returned] ${data.name ?? "peer"}`; + else text = msg.plaintext ?? `[${event}]`; + } else if (senderPubkey && msg.nonce && msg.ciphertext) { + // Try session key, then member key + text = + (await decryptDirect( + msg.nonce, + msg.ciphertext, + senderPubkey, + this.sessionSecretKey!, + )) ?? + (await decryptDirect( + msg.nonce, + msg.ciphertext, + senderPubkey, + this.creds.secretKey, + )); + if (!text) text = "[could not decrypt]"; + } else if (msg.plaintext) { + text = msg.plaintext; + } else if (msg.ciphertext && !msg.nonce) { + try { + text = Buffer.from(msg.ciphertext, "base64").toString("utf-8"); + } catch { + text = "[decode error]"; + } + } + + if (text) { + const info = senderPubkey + ? this.peerInfo.get(senderPubkey) + : null; + const fromName = + info?.name ?? senderPubkey?.slice(0, 12) ?? "system"; + const avatar = info?.avatar ?? "πŸ€–"; + this.onPush( + this.creds.meshId, + `${avatar} ${fromName}`, + text, + msg.priority ?? "next", + ); + } + } + + // Resolve pending request/response pairs + const reqId = msg._reqId; + if (reqId && this.resolvers.has(reqId)) { + const r = this.resolvers.get(reqId)!; + clearTimeout(r.timer); + this.resolvers.delete(reqId); + r.resolve(msg); + } + } catch { + /* ignore parse errors */ + } + }); + + ws.on("close", () => { + this.connected = false; + this.ws = null; + if (this.reconnectTimer) return; + const delays = [1000, 2000, 4000, 8000, 16000, 30000]; + const delay = + delays[Math.min(this.reconnectAttempt, delays.length - 1)]!; + this.reconnectAttempt++; + console.log( + `[tg-bridge] mesh ${this.creds.meshId.slice(0, 8)} reconnecting in ${delay}ms (attempt ${this.reconnectAttempt})`, + ); + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this._connect().catch((e) => + console.error("[tg-bridge] reconnect failed:", e), + ); + }, delay); + }); + + ws.on("error", (err) => { + console.error( + `[tg-bridge] WS error mesh ${this.creds.meshId.slice(0, 8)}:`, + err.message, + ); + }); + }); + } + + // -- Request / Response helpers -- + + private makeReqId(): string { + return Math.random().toString(36).slice(2) + Date.now().toString(36); + } + + private request( + msg: Record, + timeout = 10_000, + ): Promise { + return new Promise((resolve) => { + const reqId = this.makeReqId(); + const timer = setTimeout(() => { + this.resolvers.delete(reqId); + resolve(null); + }, timeout); + this.resolvers.set(reqId, { resolve, timer }); + this.ws?.send(JSON.stringify({ ...msg, _reqId: reqId })); + }); + } + + // -- Public API -- + + async sendMessage( + to: string, + message: string, + priority = "next", + ): Promise { + if (!this.ws || !this.connected) return false; + + const isDirect = /^[0-9a-f]{64}$/.test(to); + let nonce = ""; + let ciphertext = ""; + + if (isDirect) { + const enc = await encryptDirect( + message, + to, + this.sessionSecretKey!, + ); + nonce = enc.nonce; + ciphertext = enc.ciphertext; + } else { + ciphertext = Buffer.from(message, "utf-8").toString("base64"); + } + + this.ws.send( + JSON.stringify({ + type: "send", + id: this.makeReqId(), + targetSpec: to, + priority, + nonce, + ciphertext, + }), + ); + return true; + } + + async listPeers(): Promise { + const resp = await this.request({ type: "list_peers" }); + if (!resp?.peers) return []; + return resp.peers.map((p: any) => { + const name = p.displayName ?? p.pubkey?.slice(0, 12) ?? "?"; + const avatar = p.profile?.avatar; + const info = { name, avatar }; + if (p.pubkey) this.peerInfo.set(p.pubkey, info); + if (p.sessionPubkey) this.peerInfo.set(p.sessionPubkey, info); + return { + displayName: name, + pubkey: p.pubkey ?? "", + status: p.status ?? "unknown", + summary: p.summary, + cwd: p.cwd, + groups: p.groups?.map((g: any) => g.name) ?? [], + avatar, + }; + }); + } + + async findPeersByName(name: string): Promise { + const peers = await this.listPeers(); + return peers.filter( + (p) => p.displayName.toLowerCase() === name.toLowerCase(), + ); + } + + async setSummary(summary: string): Promise { + this.ws?.send(JSON.stringify({ type: "set_summary", summary })); + } + + async uploadFile( + data: Buffer, + fileName: string, + tags?: string[], + ): Promise { + const brokerHttp = this.creds.brokerUrl + .replace("wss://", "https://") + .replace("ws://", "http://") + .replace("/ws", ""); + try { + const res = await fetch(`${brokerHttp}/upload`, { + method: "POST", + headers: { + "Content-Type": "application/octet-stream", + "X-Mesh-Id": this.creds.meshId, + "X-Member-Id": this.creds.memberId, + "X-File-Name": fileName, + "X-Tags": JSON.stringify(tags ?? ["telegram"]), + "X-Persistent": "true", + }, + body: data, + signal: AbortSignal.timeout(30_000), + }); + const body = (await res.json()) as { + ok?: boolean; + fileId?: string; + error?: string; + }; + if (!res.ok || !body.fileId) return null; + return body.fileId; + } catch (e) { + console.error("[tg-bridge] upload failed:", e); + return null; + } + } + + async getFileUrl( + fileId: string, + ): Promise<{ url: string; name: string } | null> { + const resp = await this.request({ type: "get_file", fileId }); + if (!resp?.url) return null; + return { url: resp.url, name: resp.name ?? "file" }; + } + + isConnected(): boolean { + return this.connected; + } + + getMeshId(): string { + return this.creds.meshId; + } + + close(): void { + if (this.peerRefreshInterval) clearInterval(this.peerRefreshInterval); + if (this.reconnectTimer) clearTimeout(this.reconnectTimer); + this.ws?.close(); + } +} + +// --------------------------------------------------------------------------- +// Routing maps +// --------------------------------------------------------------------------- + +/** chatId β†’ meshIds this chat is connected to */ +const chatMeshes = new Map(); + +/** meshId β†’ chatIds that should receive push messages */ +const meshChats = new Map>(); + +/** meshId β†’ shared WS connection */ +const meshConnections = new Map(); + +// Pending DM picker state: chatId β†’ { message, matches, meshId } +const pendingDMs = new Map< + number, + { message: string; matches: PeerInfo[]; meshId: string } +>(); + +/** Invite URL regex: https://claudemesh.com/join/ */ +const INVITE_URL_RE = + /https?:\/\/(?:www\.)?claudemesh\.com\/join\/([A-Za-z0-9_\-\.]+)/; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Add a chat ↔ mesh link to the in-memory maps. */ +function linkChatMesh(chatId: number, meshId: string): void { + const meshes = chatMeshes.get(chatId) ?? []; + if (!meshes.includes(meshId)) { + meshes.push(meshId); + chatMeshes.set(chatId, meshes); + } + const chats = meshChats.get(meshId) ?? new Set(); + chats.add(chatId); + meshChats.set(meshId, chats); +} + +/** Remove a chat ↔ mesh link from in-memory maps. */ +function unlinkChatMesh(chatId: number, meshId: string): void { + const meshes = chatMeshes.get(chatId); + if (meshes) { + const idx = meshes.indexOf(meshId); + if (idx !== -1) meshes.splice(idx, 1); + if (meshes.length === 0) chatMeshes.delete(chatId); + } + const chats = meshChats.get(meshId); + if (chats) { + chats.delete(chatId); + if (chats.size === 0) meshChats.delete(meshId); + } +} + +/** + * Resolve which MeshConnection a chat command should target. + * If the chat is connected to exactly one mesh, return it. + * If connected to multiple and a prefix is given (e.g. "dev-team"), + * match by meshId prefix. Otherwise return null (caller should prompt). + */ +function resolveMesh( + chatId: number, + meshPrefix?: string, +): MeshConnection | null { + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) return null; + + if (meshIds.length === 1) { + return meshConnections.get(meshIds[0]!) ?? null; + } + + if (meshPrefix) { + const lower = meshPrefix.toLowerCase(); + const match = meshIds.find((id) => id.toLowerCase().startsWith(lower)); + if (match) return meshConnections.get(match) ?? null; + // Also try partial match anywhere in the id + const partial = meshIds.find((id) => id.toLowerCase().includes(lower)); + if (partial) return meshConnections.get(partial) ?? null; + } + + return null; +} + +/** + * Parse an optional mesh prefix from command text. + * Format: "meshSlug:rest" or "meshSlug rest" (for /peers etc.) + * Returns [meshPrefix | undefined, remainingText]. + */ +function parseMeshPrefix( + chatId: number, + text: string, +): [string | undefined, string] { + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length <= 1) return [undefined, text]; + + // Try "slug:rest" format + const colonIdx = text.indexOf(":"); + if (colonIdx > 0 && colonIdx < 40) { + return [text.slice(0, colonIdx), text.slice(colonIdx + 1).trimStart()]; + } + + // Try "slug rest" β€” only if first word matches a known meshId + const spaceIdx = text.indexOf(" "); + const firstWord = spaceIdx === -1 ? text : text.slice(0, spaceIdx); + const lower = firstWord.toLowerCase(); + const isSlug = meshIds.some( + (id) => + id.toLowerCase().startsWith(lower) || id.toLowerCase().includes(lower), + ); + if (isSlug) { + return [ + firstWord, + spaceIdx === -1 ? "" : text.slice(spaceIdx + 1).trimStart(), + ]; + } + + return [undefined, text]; +} + +// --------------------------------------------------------------------------- +// Push handler β€” fan out mesh push to Telegram chats +// --------------------------------------------------------------------------- + +function createPushHandler(bot: Bot) { + return ( + meshId: string, + from: string, + text: string, + _priority: string, + ) => { + const chatIds = meshChats.get(meshId); + if (!chatIds || chatIds.size === 0) return; + + const meshLabel = meshId.slice(0, 12); + const formatted = `πŸ’¬ *[${meshLabel}] ${escapeMarkdown(from)}*\n${escapeMarkdown(text)}`; + + for (const chatId of chatIds) { + bot.api + .sendMessage(chatId, formatted, { parse_mode: "Markdown" }) + .catch((e) => { + console.error(`[tg-bridge] send to chat ${chatId} failed:`, e.message); + }); + } + }; +} + +/** Escape Markdown v1 special chars for Telegram. */ +function escapeMarkdown(s: string): string { + return s.replace(/([_*\[\]()~`>#+\-=|{}.!\\])/g, "\\$1"); +} + +// --------------------------------------------------------------------------- +// Bot command handlers +// --------------------------------------------------------------------------- + +function setupBotCommands( + bot: Bot, + botToken: string, + brokerUrl: string, + saveBridge: ( + row: Omit & { chatId: number }, + ) => Promise, + deactivateBridge: (chatId: number, meshId: string) => Promise, + pushHandler: ( + meshId: string, + from: string, + text: string, + priority: string, + ) => void, +): void { + // --- /start --- + bot.command("start", async (ctx) => { + const token = ctx.match?.trim(); + if (!token) { + await ctx.reply( + "πŸ”— *Claudemesh Telegram Bridge*\n\n" + + "Use a connect link from the dashboard or CLI to get started.\n" + + "Or type /connect to link via email.\n\n" + + "Commands: /help", + { parse_mode: "Markdown" }, + ); + return; + } + + // Decode JWT token (3-part base64url) + let payload: any; + try { + const parts = token.split("."); + if (parts.length !== 3) throw new Error("not a JWT"); + payload = JSON.parse( + Buffer.from(parts[1]!, "base64url").toString("utf-8"), + ); + } catch { + await ctx.reply("❌ Invalid or expired token. Request a new link."); + return; + } + + // Validate required fields + const { meshId, memberId, pubkey, secretKey, meshSlug } = payload; + if (!meshId || !memberId || !pubkey || !secretKey) { + await ctx.reply("❌ Malformed token β€” missing credentials."); + return; + } + + // Check expiry + if (payload.expiresAt && Date.now() > payload.expiresAt) { + await ctx.reply("❌ Token expired. Request a new connect link."); + return; + } + + const chatId = ctx.chat.id; + const chatType = ctx.chat.type; + const chatTitle = + ctx.chat.type === "private" + ? (ctx.from?.first_name ?? "Private") + : ("title" in ctx.chat ? ctx.chat.title : null) ?? "Group"; + const displayName = `tg:${chatTitle}`; + + // Check if already connected + const existing = chatMeshes.get(chatId); + if (existing?.includes(meshId)) { + await ctx.reply(`Already connected to mesh \`${meshSlug ?? meshId.slice(0, 8)}\`.`, { + parse_mode: "Markdown", + }); + return; + } + + try { + // Persist bridge row + await saveBridge({ + chatId, + meshId, + memberId, + pubkey, + secretKey, + displayName, + chatType, + chatTitle, + }); + + // Connect or reuse WS + await ensureMeshConnection( + { meshId, memberId, pubkey, secretKey, displayName, brokerUrl }, + pushHandler, + ); + + linkChatMesh(chatId, meshId); + + await ctx.reply( + `βœ… Connected to mesh *${escapeMarkdown(meshSlug ?? meshId.slice(0, 8))}*\\!`, + { parse_mode: "MarkdownV2" }, + ); + } catch (e) { + console.error("[tg-bridge] /start connect failed:", e); + await ctx.reply("❌ Connection failed. Try again or request a new token."); + } + }); + + // --- /connect (email flow stub) --- + bot.command("connect", async (ctx) => { + console.log("[tg-bridge] /connect requested β€” email flow not implemented yet"); + await ctx.reply( + "πŸ“§ Email verification is not implemented yet.\n\n" + + "Use a connect link from the dashboard or CLI instead:\n" + + "`claudemesh connect telegram`", + { parse_mode: "Markdown" }, + ); + }); + + // --- /disconnect --- + bot.command("disconnect", async (ctx) => { + const chatId = ctx.chat.id; + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) { + await ctx.reply("Not connected to any mesh."); + return; + } + + const [meshPrefix, _] = parseMeshPrefix(chatId, ctx.match ?? ""); + let targetMeshId: string | undefined; + + if (meshIds.length === 1) { + targetMeshId = meshIds[0]!; + } else if (meshPrefix) { + const lower = meshPrefix.toLowerCase(); + targetMeshId = meshIds.find( + (id) => + id.toLowerCase().startsWith(lower) || + id.toLowerCase().includes(lower), + ); + } + + if (!targetMeshId && meshIds.length > 1) { + const list = meshIds.map((id) => `β€’ \`${id.slice(0, 12)}\``).join("\n"); + await ctx.reply( + `Connected to multiple meshes. Specify which:\n${list}\n\n/disconnect `, + { parse_mode: "Markdown" }, + ); + return; + } + + if (!targetMeshId) { + await ctx.reply("Mesh not found."); + return; + } + + try { + await deactivateBridge(chatId, targetMeshId); + unlinkChatMesh(chatId, targetMeshId); + + // If no more chats reference this mesh, close the WS + const remaining = meshChats.get(targetMeshId); + if (!remaining || remaining.size === 0) { + const conn = meshConnections.get(targetMeshId); + if (conn) { + conn.close(); + meshConnections.delete(targetMeshId); + } + } + + await ctx.reply(`βœ… Disconnected from mesh \`${targetMeshId.slice(0, 12)}\`.`, { + parse_mode: "Markdown", + }); + } catch (e) { + console.error("[tg-bridge] /disconnect failed:", e); + await ctx.reply("❌ Disconnect failed."); + } + }); + + // --- /meshes --- + bot.command("meshes", async (ctx) => { + const meshIds = chatMeshes.get(ctx.chat.id); + if (!meshIds || meshIds.length === 0) { + await ctx.reply("Not connected to any mesh. Use a connect link to join."); + return; + } + const lines = meshIds.map((id) => { + const conn = meshConnections.get(id); + const status = conn?.isConnected() ? "🟒" : "πŸ”΄"; + return `${status} \`${id.slice(0, 16)}\``; + }); + await ctx.reply(`*Connected meshes:*\n${lines.join("\n")}`, { + parse_mode: "Markdown", + }); + }); + + // --- /peers [mesh-slug] --- + bot.command("peers", async (ctx) => { + const chatId = ctx.chat.id; + const [meshPrefix] = parseMeshPrefix(chatId, ctx.match ?? ""); + const conn = resolveMesh(chatId, meshPrefix); + + if (!conn) { + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) { + await ctx.reply("Not connected to any mesh."); + } else { + await ctx.reply( + "Connected to multiple meshes. Specify which: /peers ", + ); + } + return; + } + + const peers = await conn.listPeers(); + if (peers.length === 0) { + await ctx.reply("No peers online."); + return; + } + const lines = peers.map((p) => { + const icon = + p.status === "idle" ? "🟒" : p.status === "working" ? "🟑" : "πŸ”΄"; + const summary = p.summary ? ` β€” _${escapeMarkdown(p.summary)}_` : ""; + return `${icon} *${escapeMarkdown(p.displayName)}*${summary}`; + }); + await ctx.reply(lines.join("\n"), { parse_mode: "Markdown" }); + }); + + // --- /dm [mesh:] --- + bot.command("dm", async (ctx) => { + const chatId = ctx.chat.id; + const rawText = ctx.match ?? ""; + if (!rawText.trim()) { + await ctx.reply("Usage: /dm "); + return; + } + + const [meshPrefix, text] = parseMeshPrefix(chatId, rawText); + const conn = resolveMesh(chatId, meshPrefix); + if (!conn) { + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) { + await ctx.reply("Not connected to any mesh."); + } else { + await ctx.reply( + "Connected to multiple meshes. Prefix with mesh slug: /dm mesh-slug:Mou hello", + ); + } + return; + } + + const spaceIdx = text.indexOf(" "); + if (spaceIdx === -1) { + await ctx.reply("Usage: /dm "); + return; + } + + const target = text.slice(0, spaceIdx); + const message = text.slice(spaceIdx + 1); + const matches = await conn.findPeersByName(target); + + if (matches.length === 0) { + await ctx.reply(`❌ No peer named "${target}" found.`); + return; + } + + if (matches.length === 1) { + const ok = await conn.sendMessage( + matches[0]!.pubkey, + `[via Telegram] ${message}`, + "now", + ); + await ctx.reply( + ok + ? `βœ… β†’ ${matches[0]!.avatar ?? "πŸ€–"} ${matches[0]!.displayName}` + : "❌ Not connected", + ); + return; + } + + // Multiple matches β€” show inline keyboard picker + pendingDMs.set(chatId, { + message, + matches, + meshId: conn.getMeshId(), + }); + const buttons = matches.map((p, i) => { + const dir = p.cwd?.split("/").pop() ?? "?"; + const avatar = p.avatar ?? "πŸ€–"; + return [ + { text: `${avatar} ${p.displayName} (${dir})`, callback_data: `dm:${i}` }, + ]; + }); + buttons.push([{ text: "πŸ“¨ Send to ALL", callback_data: "dm:all" }]); + await ctx.reply(`Multiple "${target}" peers online. Pick one or all:`, { + reply_markup: { inline_keyboard: buttons }, + }); + }); + + // --- /broadcast [mesh:] --- + bot.command("broadcast", async (ctx) => { + const chatId = ctx.chat.id; + const [meshPrefix, message] = parseMeshPrefix(chatId, ctx.match ?? ""); + if (!message.trim()) { + await ctx.reply("Usage: /broadcast "); + return; + } + const conn = resolveMesh(chatId, meshPrefix); + if (!conn) { + await ctx.reply("Not connected or specify mesh: /broadcast mesh-slug:hello"); + return; + } + const ok = await conn.sendMessage("*", `[via Telegram] ${message}`, "now"); + await ctx.reply(ok ? "βœ… Broadcast sent" : "❌ Not connected"); + }); + + // --- /group [mesh:]@name --- + bot.command("group", async (ctx) => { + const chatId = ctx.chat.id; + const [meshPrefix, text] = parseMeshPrefix(chatId, ctx.match ?? ""); + if (!text.trim()) { + await ctx.reply("Usage: /group @group-name "); + return; + } + const spaceIdx = text.indexOf(" "); + if (spaceIdx === -1) { + await ctx.reply("Usage: /group @group-name "); + return; + } + const target = text.slice(0, spaceIdx); + const message = text.slice(spaceIdx + 1); + const conn = resolveMesh(chatId, meshPrefix); + if (!conn) { + await ctx.reply("Not connected or specify mesh."); + return; + } + const ok = await conn.sendMessage(target, `[via Telegram] ${message}`, "now"); + await ctx.reply(ok ? `βœ… Sent to ${target}` : "❌ Not connected"); + }); + + // --- /file --- + bot.command("file", async (ctx) => { + const chatId = ctx.chat.id; + const fileId = ctx.match?.trim(); + if (!fileId) { + await ctx.reply("Usage: /file "); + return; + } + + // Try all connected meshes for this chat + const meshIds = chatMeshes.get(chatId) ?? []; + for (const meshId of meshIds) { + const conn = meshConnections.get(meshId); + if (!conn?.isConnected()) continue; + const file = await conn.getFileUrl(fileId); + if (!file) continue; + try { + const resp = await fetch(file.url, { + signal: AbortSignal.timeout(30_000), + }); + if (!resp.ok) continue; + const buf = Buffer.from(await resp.arrayBuffer()); + await ctx.replyWithDocument(new InputFile(buf, file.name)); + return; + } catch { + continue; + } + } + await ctx.reply(`❌ File \`${fileId}\` not found in any connected mesh.`, { + parse_mode: "Markdown", + }); + }); + + // --- /status --- + bot.command("status", async (ctx) => { + const chatId = ctx.chat.id; + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) { + await ctx.reply("Not connected to any mesh."); + return; + } + const lines = meshIds.map((id) => { + const conn = meshConnections.get(id); + const icon = conn?.isConnected() ? "🟒" : "πŸ”΄"; + return `${icon} \`${id.slice(0, 16)}\``; + }); + await ctx.reply( + `*Claudemesh Telegram Bridge*\n${lines.join("\n")}`, + { parse_mode: "Markdown" }, + ); + }); + + // --- /help --- + bot.command("help", async (ctx) => { + await ctx.reply( + "πŸ”— *Claudemesh Telegram Bridge*\n\n" + + "*Commands:*\n" + + "β€’ /start β€” Connect via deep link\n" + + "β€’ /connect β€” Link via email (coming soon)\n" + + "β€’ /disconnect β€” Disconnect from mesh\n" + + "β€’ /meshes β€” List connected meshes\n" + + "β€’ /peers β€” List online peers\n" + + "β€’ /dm β€” DM a peer\n" + + "β€’ /broadcast β€” Message all peers\n" + + "β€’ /group @name β€” Message a group\n" + + "β€’ /file β€” Download a mesh file\n" + + "β€’ /status β€” Connection status\n\n" + + "_Multi-mesh: prefix commands with mesh slug_\n" + + "`/peers dev-team` or `/dm dev-team:Mou hello`", + { parse_mode: "Markdown" }, + ); + }); + + // --- Callback query handler (peer picker inline keyboard) --- + bot.on("callback_query:data", async (ctx) => { + const data = ctx.callbackQuery.data; + const chatId = ctx.chat?.id; + if (!chatId || !data.startsWith("dm:")) { + await ctx.answerCallbackQuery(); + return; + } + + const pending = pendingDMs.get(chatId); + if (!pending) { + await ctx.answerCallbackQuery({ text: "Session expired. Send /dm again." }); + return; + } + + const conn = meshConnections.get(pending.meshId); + if (!conn?.isConnected()) { + pendingDMs.delete(chatId); + await ctx.answerCallbackQuery({ text: "Not connected." }); + return; + } + + if (data === "dm:all") { + let sent = 0; + for (const p of pending.matches) { + const ok = await conn.sendMessage( + p.pubkey, + `[via Telegram] ${pending.message}`, + "now", + ); + if (ok) sent++; + } + pendingDMs.delete(chatId); + await ctx.answerCallbackQuery({ text: `Sent to ${sent} peers` }); + await ctx.editMessageText( + `βœ… Sent to all ${sent} ${pending.matches[0]?.displayName ?? "?"} peers`, + ); + return; + } + + const idx = parseInt(data.slice(3)); + const peer = pending.matches[idx]; + if (!peer) { + await ctx.answerCallbackQuery({ text: "Invalid selection" }); + return; + } + + const ok = await conn.sendMessage( + peer.pubkey, + `[via Telegram] ${pending.message}`, + "now", + ); + pendingDMs.delete(chatId); + const dir = peer.cwd?.split("/").pop() ?? "?"; + await ctx.answerCallbackQuery({ text: ok ? "Sent!" : "Failed" }); + await ctx.editMessageText( + ok + ? `βœ… β†’ ${peer.avatar ?? "πŸ€–"} ${peer.displayName} (${dir})` + : "❌ Not connected", + ); + }); + + // --- Photo upload β†’ mesh file sharing --- + bot.on("message:photo", async (ctx) => { + const chatId = ctx.chat.id; + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) return; + + const photo = ctx.message.photo.at(-1); + if (!photo) return; + + try { + const file = await ctx.api.getFile(photo.file_id); + const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`; + const resp = await fetch(url); + const buf = Buffer.from(await resp.arrayBuffer()); + const name = `telegram-photo-${Date.now()}.jpg`; + const caption = ctx.message.caption + ? ` β€” "${ctx.message.caption}"` + : ""; + + let shared = 0; + for (const meshId of meshIds) { + const conn = meshConnections.get(meshId); + if (!conn?.isConnected()) continue; + const fileId = await conn.uploadFile(buf, name, [ + "telegram", + "photo", + ]); + if (fileId) { + await conn.sendMessage( + "*", + `[via Telegram] πŸ“· Photo shared${caption} (file: ${fileId})`, + "next", + ); + shared++; + } + } + await ctx.reply( + shared > 0 + ? `βœ… Photo shared to ${shared} mesh${shared > 1 ? "es" : ""}` + : "❌ Upload failed", + ); + } catch (e) { + await ctx.reply( + `❌ ${e instanceof Error ? e.message : String(e)}`, + ); + } + }); + + // --- Document upload β†’ mesh file sharing --- + bot.on("message:document", async (ctx) => { + const chatId = ctx.chat.id; + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) return; + + const doc = ctx.message.document; + if (!doc) return; + + try { + const file = await ctx.api.getFile(doc.file_id); + const url = `https://api.telegram.org/file/bot${botToken}/${file.file_path}`; + const resp = await fetch(url); + const buf = Buffer.from(await resp.arrayBuffer()); + const name = doc.file_name ?? `telegram-file-${Date.now()}`; + const caption = ctx.message.caption + ? ` β€” "${ctx.message.caption}"` + : ""; + + let shared = 0; + for (const meshId of meshIds) { + const conn = meshConnections.get(meshId); + if (!conn?.isConnected()) continue; + const fileId = await conn.uploadFile(buf, name, [ + "telegram", + "document", + ]); + if (fileId) { + await conn.sendMessage( + "*", + `[via Telegram] πŸ“Ž File shared: ${name}${caption} (file: ${fileId})`, + "next", + ); + shared++; + } + } + await ctx.reply( + shared > 0 + ? `βœ… File shared to ${shared} mesh${shared > 1 ? "es" : ""}: ${name}` + : "❌ Upload failed", + ); + } catch (e) { + await ctx.reply( + `❌ ${e instanceof Error ? e.message : String(e)}`, + ); + } + }); + + // --- Default text handler: invite URL detection, @mentions, broadcast --- + bot.on("message:text", async (ctx) => { + const chatId = ctx.chat.id; + const text = ctx.message.text; + if (text.startsWith("/")) return; // Skip unknown commands + + // --- Invite URL detection --- + const inviteMatch = text.match(INVITE_URL_RE); + if (inviteMatch) { + const inviteToken = inviteMatch[1]!; + await ctx.reply( + `πŸ”— Detected invite link.\n\nTo connect, use the deep link from the dashboard or CLI.\nInvite token: \`${inviteToken}\``, + { parse_mode: "Markdown" }, + ); + return; + } + + const meshIds = chatMeshes.get(chatId); + if (!meshIds || meshIds.length === 0) { + // Not connected β€” ignore non-command messages + return; + } + + // --- @Mention pattern: "@PeerName message" --- + const mentionMatch = text.match(/^@(\S+)\s+([\s\S]+)$/); + if (mentionMatch) { + const target = mentionMatch[1]!; + const message = mentionMatch[2]!; + + // For multi-mesh, try all connections + for (const meshId of meshIds) { + const conn = meshConnections.get(meshId); + if (!conn?.isConnected()) continue; + const matches = await conn.findPeersByName(target); + if (matches.length === 0) continue; + + if (matches.length === 1) { + const ok = await conn.sendMessage( + matches[0]!.pubkey, + `[via Telegram] ${message}`, + "now", + ); + await ctx.reply( + ok + ? `βœ… β†’ ${matches[0]!.avatar ?? "πŸ€–"} ${matches[0]!.displayName}` + : "❌ Not connected", + ); + return; + } + + // Multiple matches β€” picker + pendingDMs.set(chatId, { message, matches, meshId }); + const buttons = matches.map((p, i) => { + const dir = p.cwd?.split("/").pop() ?? "?"; + return [ + { + text: `${p.avatar ?? "πŸ€–"} ${p.displayName} (${dir})`, + callback_data: `dm:${i}`, + }, + ]; + }); + buttons.push([ + { text: "πŸ“¨ Send to ALL", callback_data: "dm:all" }, + ]); + await ctx.reply( + `Multiple "${target}" peers. Pick one or all:`, + { reply_markup: { inline_keyboard: buttons } }, + ); + return; + } + + await ctx.reply(`❌ No peer named "${target}" in any connected mesh.`); + return; + } + + // --- No mention β†’ broadcast to all connected meshes --- + let sent = 0; + for (const meshId of meshIds) { + const conn = meshConnections.get(meshId); + if (!conn?.isConnected()) continue; + const ok = await conn.sendMessage( + "*", + `[via Telegram] ${text}`, + "next", + ); + if (ok) sent++; + } + if (sent === 0) { + await ctx.reply("❌ Not connected to any mesh."); + } + }); +} + +// --------------------------------------------------------------------------- +// Ensure a mesh WS connection exists (create or reuse) +// --------------------------------------------------------------------------- + +async function ensureMeshConnection( + creds: MeshCredentials, + pushHandler: ( + meshId: string, + from: string, + text: string, + priority: string, + ) => void, +): Promise { + const existing = meshConnections.get(creds.meshId); + if (existing?.isConnected()) return existing; + + // Close stale connection if any + if (existing) { + existing.close(); + meshConnections.delete(creds.meshId); + } + + const conn = new MeshConnection(creds, pushHandler); + meshConnections.set(creds.meshId, conn); + await conn.connect(); + await conn.setSummary( + "Telegram bridge β€” relays messages between Telegram chats and mesh peers", + ); + return conn; +} + +// --------------------------------------------------------------------------- +// Boot β€” called by broker on startup +// --------------------------------------------------------------------------- + +export async function bootTelegramBridge( + loadActiveBridges: () => Promise, + saveBridge: ( + row: Omit & { chatId: number }, + ) => Promise, + deactivateBridge: (chatId: number, meshId: string) => Promise, + botToken: string, + brokerUrl: string, +): Promise { + await ensureSodium(); + + const bot = new Bot(botToken); + const pushHandler = createPushHandler(bot); + + // Load all active bridges from DB + const rows = await loadActiveBridges(); + console.log(`[tg-bridge] loaded ${rows.length} active bridge(s) from DB`); + + // Group by meshId to connect WS pool + const byMesh = new Map(); + for (const row of rows) { + const arr = byMesh.get(row.meshId) ?? []; + arr.push(row); + byMesh.set(row.meshId, arr); + } + + // Connect one WS per unique mesh + for (const [meshId, meshRows] of byMesh) { + const first = meshRows[0]!; + try { + await ensureMeshConnection( + { + meshId, + memberId: first.memberId, + pubkey: first.pubkey, + secretKey: first.secretKey, + displayName: first.displayName, + brokerUrl, + }, + pushHandler, + ); + console.log( + `[tg-bridge] connected WS for mesh ${meshId.slice(0, 8)} (${meshRows.length} chat(s))`, + ); + } catch (e) { + console.error( + `[tg-bridge] failed to connect mesh ${meshId.slice(0, 8)}:`, + e, + ); + } + + // Populate routing maps for all chats in this mesh + for (const row of meshRows) { + linkChatMesh(row.chatId, meshId); + } + } + + // Wire up bot commands + setupBotCommands( + bot, + botToken, + brokerUrl, + saveBridge, + deactivateBridge, + pushHandler, + ); + + // Start Grammy long-polling + console.log("[tg-bridge] starting bot..."); + bot.start({ + onStart: () => + console.log( + `[tg-bridge] bot running β€” ${meshConnections.size} mesh(es), ${chatMeshes.size} chat(s)`, + ), + }); +} + +// --------------------------------------------------------------------------- +// Connect a new chat at runtime (called from broker HTTP endpoints) +// --------------------------------------------------------------------------- + +export async function connectChat( + chatId: number, + chatType: string, + chatTitle: string | null, + meshCreds: MeshCredentials, + pushHandler?: ( + meshId: string, + from: string, + text: string, + priority: string, + ) => void, +): Promise { + // Default push handler is a no-op if bot isn't running yet + // (the real one is wired during bootTelegramBridge) + const handler = pushHandler ?? (() => {}); + + await ensureMeshConnection(meshCreds, handler); + linkChatMesh(chatId, meshCreds.meshId); + + console.log( + `[tg-bridge] chat ${chatId} (${chatType}) connected to mesh ${meshCreds.meshId.slice(0, 8)}`, + ); +} diff --git a/apps/broker/src/telegram-token.ts b/apps/broker/src/telegram-token.ts new file mode 100644 index 0000000..14cb9cc --- /dev/null +++ b/apps/broker/src/telegram-token.ts @@ -0,0 +1,143 @@ +/** + * JWT utilities for Telegram bridge connections. + * + * When a user connects their Telegram chat to a mesh, the broker generates + * a short-lived JWT containing mesh credentials. The Telegram bot decodes + * this token to establish the connection. + * + * Pure-crypto implementation β€” no external JWT library. + * Tokens are URL-safe (base64url) for use as Telegram deep link parameters. + * + * IMPORTANT: The JWT payload contains the member's secretKey. + * Never log the token or its decoded payload. + */ + +import { createHmac } from "node:crypto"; + +// --- Types --- + +export interface TelegramConnectPayload { + meshId: string; + meshSlug: string; + memberId: string; + pubkey: string; + secretKey: string; // ed25519 secret key β€” sensitive + createdBy: string; // Dashboard userId or CLI memberId +} + +interface JwtClaims extends TelegramConnectPayload { + iss: string; + sub: string; + iat: number; + exp: number; +} + +// --- Helpers --- + +function base64url(data: string): string { + return Buffer.from(data).toString("base64url"); +} + +function base64urlDecode(str: string): string { + return Buffer.from(str, "base64url").toString("utf-8"); +} + +function sign(input: string, secret: string): string { + return createHmac("sha256", secret).update(input).digest("base64url"); +} + +// --- Public API --- + +const JWT_HEADER = base64url(JSON.stringify({ alg: "HS256", typ: "JWT" })); +const TOKEN_TTL_SECONDS = 15 * 60; // 15 minutes + +/** + * Create a signed JWT containing Telegram connect credentials. + * Expires in 15 minutes. + */ +export function generateTelegramConnectToken( + payload: TelegramConnectPayload, + secret: string, +): string { + const now = Math.floor(Date.now() / 1000); + + const claims: JwtClaims = { + ...payload, + iss: "claudemesh-broker", + sub: "telegram-connect", + iat: now, + exp: now + TOKEN_TTL_SECONDS, + }; + + const encodedPayload = base64url(JSON.stringify(claims)); + const signingInput = `${JWT_HEADER}.${encodedPayload}`; + const signature = sign(signingInput, secret); + + return `${signingInput}.${signature}`; +} + +/** + * Validate and decode a Telegram connect JWT. + * Returns the payload on success, or null on any failure + * (bad signature, expired, wrong subject). + */ +export function validateTelegramConnectToken( + token: string, + secret: string, +): TelegramConnectPayload | null { + try { + const parts = token.split("."); + if (parts.length !== 3) return null; + + const [headerB64, payloadB64, signatureB64] = parts as [string, string, string]; + + // Verify signature + const signingInput = `${headerB64}.${payloadB64}`; + const expectedSignature = sign(signingInput, secret); + if (signatureB64 !== expectedSignature) return null; + + // Verify header algorithm + const header = JSON.parse(base64urlDecode(headerB64)); + if (header.alg !== "HS256") return null; + + // Decode and validate claims + const claims: JwtClaims = JSON.parse(base64urlDecode(payloadB64)); + + // Check subject + if (claims.sub !== "telegram-connect") return null; + + // Check expiry + const now = Math.floor(Date.now() / 1000); + if (claims.exp < now) return null; + + // Check iat not in the future (30s tolerance) + if (claims.iat > now + 30) return null; + + // Extract payload fields (strip JWT claims) + const { + meshId, + meshSlug, + memberId, + pubkey, + secretKey, + createdBy, + } = claims; + + // Basic presence check + if (!meshId || !meshSlug || !memberId || !pubkey || !secretKey || !createdBy) { + return null; + } + + return { meshId, meshSlug, memberId, pubkey, secretKey, createdBy }; + } catch { + return null; + } +} + +/** + * Generate a Telegram deep link that passes the JWT as start parameter. + * Format: https://t.me/{botUsername}?start={token} + */ +export function generateDeepLink(token: string, botUsername: string): string { + return `https://t.me/${botUsername}?start=${token}`; +} diff --git a/apps/cli/src/commands/connect-telegram.ts b/apps/cli/src/commands/connect-telegram.ts new file mode 100644 index 0000000..d01a62e --- /dev/null +++ b/apps/cli/src/commands/connect-telegram.ts @@ -0,0 +1,65 @@ +import { loadConfig } from "../state/config"; + +export async function connectTelegram(args: string[]): Promise { + const config = loadConfig(); + if (config.meshes.length === 0) { + console.error("No meshes joined. Run 'claudemesh join' first."); + process.exit(1); + } + + const mesh = config.meshes[0]!; + const linkOnly = args.includes("--link"); + + // Convert WS broker URL to HTTP + const brokerHttp = mesh.brokerUrl + .replace("wss://", "https://") + .replace("ws://", "http://") + .replace("/ws", ""); + + console.log("Requesting Telegram connect token..."); + + const res = await fetch(`${brokerHttp}/tg/token`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + meshId: mesh.meshId, + memberId: mesh.memberId, + pubkey: mesh.pubkey, + secretKey: mesh.secretKey, + }), + signal: AbortSignal.timeout(10_000), + }); + + if (!res.ok) { + const err = await res.json().catch(() => ({})); + console.error(`Failed: ${(err as any).error ?? res.statusText}`); + process.exit(1); + } + + const { token, deepLink } = (await res.json()) as { + token: string; + deepLink: string; + }; + + if (linkOnly) { + console.log(deepLink); + return; + } + + // Print QR code using simple block characters + console.log("\n Connect Telegram to your mesh:\n"); + console.log(` ${deepLink}\n`); + console.log(" Open this link on your phone, or scan the QR code"); + console.log(" with your Telegram camera.\n"); + + // Try to generate QR with qrcode-terminal if available + try { + const QRCode = require("qrcode-terminal"); + QRCode.generate(deepLink, { small: true }, (code: string) => { + console.log(code); + }); + } catch { + // qrcode-terminal not available, link is enough + console.log(" (Install qrcode-terminal for QR code display)"); + } +} diff --git a/apps/cli/src/commands/disconnect-telegram.ts b/apps/cli/src/commands/disconnect-telegram.ts new file mode 100644 index 0000000..a5e3bfc --- /dev/null +++ b/apps/cli/src/commands/disconnect-telegram.ts @@ -0,0 +1,3 @@ +export async function disconnectTelegram(): Promise { + console.log("To disconnect Telegram, send /disconnect in the bot chat."); +} diff --git a/apps/cli/src/index.ts b/apps/cli/src/index.ts index 1021072..9bd2f7f 100644 --- a/apps/cli/src/index.ts +++ b/apps/cli/src/index.ts @@ -29,6 +29,8 @@ import { runRemember, runRecall } from "./commands/memory"; import { runInfo } from "./commands/info"; import { runRemind } from "./commands/remind"; import { runCreate } from "./commands/create"; +import { runSync } from "./commands/sync"; +import { runProfile, type ProfileFlags } from "./commands/profile"; import { VERSION } from "./version"; const launch = defineCommand({ @@ -270,6 +272,26 @@ const main = defineCommand({ await runRemind(args, positionals); }, }), + sync: defineCommand({ + meta: { name: "sync", description: "Sync meshes from your dashboard account" }, + args: { + force: { type: "boolean", description: "Re-link account even if already linked", default: false }, + }, + async run({ args }) { await runSync(args); }, + }), + profile: defineCommand({ + meta: { name: "profile", description: "View or edit your member profile" }, + args: { + mesh: { type: "string", description: "Mesh slug (auto-selected if only one joined)" }, + "role-tag": { type: "string", description: "Set role tag (e.g. 'backend-dev', 'lead')" }, + groups: { type: "string", description: "Set groups as 'group:role,...' (e.g. 'eng:lead,review')" }, + "message-mode": { type: "string", description: "'push' | 'inbox' | 'off'" }, + name: { type: "string", description: "Set display name" }, + member: { type: "string", description: "Edit another member (admin only)" }, + json: { type: "boolean", description: "Output as JSON", default: false }, + }, + async run({ args }) { await runProfile(args as ProfileFlags); }, + }), status: defineCommand({ meta: { name: "status", description: "Check broker connectivity for each joined mesh" }, async run() { await runStatus(); }, diff --git a/apps/telegram/Dockerfile b/apps/telegram/Dockerfile new file mode 100644 index 0000000..f3b105c --- /dev/null +++ b/apps/telegram/Dockerfile @@ -0,0 +1,15 @@ +# Telegram bridge for claudemesh +# Node 22 runtime with tsx for TypeScript execution + +FROM node:22-slim + +WORKDIR /app + +COPY package.json ./ +RUN npm install --omit=dev + +COPY src/ ./src/ + +ENV NODE_ENV=production + +CMD ["npx", "tsx", "src/index.ts"] diff --git a/apps/telegram/package.json b/apps/telegram/package.json index 3af626b..7a17a73 100644 --- a/apps/telegram/package.json +++ b/apps/telegram/package.json @@ -2,7 +2,6 @@ "name": "@claudemesh/telegram", "version": "0.1.0", "private": true, - "type": "module", "scripts": { "start": "bun src/index.ts", "dev": "bun --hot src/index.ts" @@ -10,7 +9,9 @@ "dependencies": { "grammy": "^1.35.0", "ws": "^8.18.0", - "libsodium-wrappers": "^0.7.15" + "libsodium": "^0.7.15", + "libsodium-wrappers": "^0.7.15", + "tsx": "^4.19.0" }, "devDependencies": { "@types/ws": "^8.5.13", diff --git a/apps/telegram/src/index.ts b/apps/telegram/src/index.ts index afebdc1..439e354 100644 --- a/apps/telegram/src/index.ts +++ b/apps/telegram/src/index.ts @@ -44,9 +44,22 @@ interface JoinedMesh { } function loadMeshConfig(): JoinedMesh[] { + // Support env-based config for Docker/VPS deployment + if (process.env.MESH_ID && process.env.MESH_MEMBER_ID && process.env.MESH_PUBKEY && process.env.MESH_SECRET_KEY) { + return [{ + meshId: process.env.MESH_ID, + memberId: process.env.MESH_MEMBER_ID, + slug: process.env.MESH_SLUG ?? "mesh", + name: process.env.MESH_NAME ?? "mesh", + pubkey: process.env.MESH_PUBKEY, + secretKey: process.env.MESH_SECRET_KEY, + brokerUrl: process.env.MESH_BROKER_URL ?? "wss://ic.claudemesh.com/ws", + }]; + } + // Fall back to config file const path = join(CONFIG_DIR, "config.json"); if (!existsSync(path)) { - console.error(`No config at ${path} β€” run 'claudemesh join' first`); + console.error(`No config at ${path} β€” set MESH_ID/MESH_MEMBER_ID/MESH_PUBKEY/MESH_SECRET_KEY env vars or run 'claudemesh join' first`); process.exit(1); } const config = JSON.parse(readFileSync(path, "utf-8")); diff --git a/docs/telegram-bridge-spec.md b/docs/telegram-bridge-spec.md new file mode 100644 index 0000000..927d727 --- /dev/null +++ b/docs/telegram-bridge-spec.md @@ -0,0 +1,347 @@ +# Telegram Bridge β€” Multi-Tenant Spec + +**Status:** Draft +**Date:** 2026-04-09 +**Author:** Mou (Claude Opus 4.6) + +--- + +## Overview + +One Telegram bot (`@claudemesh_bot`), many users, many meshes. Users connect their Telegram chat to their mesh through any of four entry points. The bridge runs as a single service inside the broker process β€” no separate containers. + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Broker process β”‚ +β”‚ β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ HTTP/WS server β”‚ β”‚ Telegram Bridge Module β”‚ β”‚ +β”‚ β”‚ (existing) β”‚ β”‚ β”‚ β”‚ +β”‚ β”‚ β”‚ β”‚ Grammy bot (long-polling) β”‚ β”‚ +β”‚ β”‚ POST /tg/connect │──▢│ WS pool (1 per mesh) β”‚ β”‚ +β”‚ β”‚ POST /tg/disconnectβ”‚ β”‚ Routes: chatId β†’ meshId β”‚ β”‚ +β”‚ β”‚ GET /tg/status β”‚ β”‚ β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ +β”‚ DB: mesh.telegram_bridge β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ id β”‚ chat_id β”‚ mesh_id β”‚ member_id β”‚ pubkey β”‚ .. β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## DB Schema + +```sql +CREATE TABLE mesh.telegram_bridge ( + id TEXT PRIMARY KEY DEFAULT gen_random_uuid(), + chat_id BIGINT NOT NULL, -- Telegram chat ID + chat_type TEXT DEFAULT 'private', -- private | group | supergroup | channel + chat_title TEXT, -- Group name or user's first name + mesh_id TEXT NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE, + member_id TEXT NOT NULL REFERENCES mesh.member(id), + pubkey TEXT NOT NULL, -- ed25519 hex (member pubkey) + secret_key TEXT NOT NULL, -- ed25519 hex (encrypted at rest) + display_name TEXT DEFAULT 'telegram', -- Peer name in mesh + active BOOLEAN DEFAULT true, + created_at TIMESTAMP DEFAULT NOW() NOT NULL, + disconnected_at TIMESTAMP, + UNIQUE(chat_id, mesh_id) -- One connection per chat per mesh +); +CREATE INDEX tg_bridge_mesh_idx ON mesh.telegram_bridge(mesh_id) WHERE active = true; +CREATE INDEX tg_bridge_chat_idx ON mesh.telegram_bridge(chat_id) WHERE active = true; +``` + +## Connection Token + +A short-lived token that authorizes a Telegram chat to join a specific mesh. + +```typescript +interface TelegramConnectToken { + meshId: string; + meshSlug: string; + memberId: string; // Pre-created member for this bridge + pubkey: string; + secretKey: string; // Encrypted with BROKER_ENCRYPTION_KEY + expiresAt: number; // Unix ms, 15 min TTL + createdBy: string; // Dashboard userId or CLI memberId +} +``` + +**Token flow:** +1. Dashboard/CLI requests token β†’ broker creates member + generates token +2. Token is JWT signed with `BROKER_ENCRYPTION_KEY`, contains mesh credentials +3. Bot receives token β†’ decodes β†’ stores in `telegram_bridge` table β†’ connects WS + +**Endpoint:** +``` +POST /tg/token +Body: { meshId, createdBy } +Auth: Dashboard session cookie or CLI sync JWT +Response: { token, deepLink: "https://t.me/claudemesh_bot?start=" } +``` + +--- + +## Entry Points + +### A. Dashboard Deep Link (1 click) + +**Flow:** +``` +Dashboard β†’ Integrations β†’ Telegram + ↓ +"Connect Telegram" button + ↓ +POST /tg/token { meshId, createdBy: dashboardUserId } + ↓ +Returns deep link: https://t.me/claudemesh_bot?start= + ↓ +Browser opens Telegram β†’ bot receives /start + ↓ +Bot validates token β†’ creates bridge row β†’ connects to mesh + ↓ +"βœ… Connected to mesh 'alexis-team'!" +``` + +**Dashboard UI:** +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Integrations β”‚ +β”‚ β”‚ +β”‚ πŸ€– Telegram β”‚ +β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ +β”‚ β”‚ Connect your Telegram to β”‚ β”‚ +β”‚ β”‚ receive mesh messages on β”‚ β”‚ +β”‚ β”‚ your phone. β”‚ β”‚ +β”‚ β”‚ β”‚ β”‚ +β”‚ β”‚ [Connect Telegram] β”‚ β”‚ +β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ +β”‚ β”‚ +β”‚ Connected chats: β”‚ +β”‚ β€’ Alejandro (private) βœ… β”‚ +β”‚ β€’ Dev Team (group) βœ… β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### B. CLI QR Code + +**Flow:** +``` +$ claudemesh connect telegram + ↓ +CLI calls POST /tg/token { meshId, createdBy: memberId } + ↓ +Receives deep link + ↓ +Renders QR code in terminal (qrcode-terminal) + ↓ +β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ +β–ˆβ–ˆ β–„β–„β–„β–„β–„ β–ˆβ–€β–ˆ β–ˆβ–„β–ˆβ–ˆ β–ˆ +β–ˆβ–ˆ β–ˆ β–ˆ β–ˆβ–€β–€β–€β–ˆβ–€β–€β–ˆ β–ˆ +β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ + +Scan with your phone to connect Telegram + ↓ +User scans β†’ opens Telegram β†’ bot connects +``` + +**CLI command:** +```typescript +// apps/cli/src/commands/connect.ts +claudemesh connect telegram // QR code +claudemesh connect telegram --link // Print URL instead +claudemesh disconnect telegram // Remove bridge +``` + +### C. Email Verification (zero-knowledge) + +**Flow:** +``` +User opens @claudemesh_bot β†’ /connect + ↓ +Bot: "Enter your claudemesh email:" + ↓ +User: "alex@example.com" + ↓ +Bot β†’ POST /tg/email-verify { email, chatId } + ↓ +Broker looks up dashboard user β†’ sends 6-digit code via email + ↓ +Bot: "Enter the 6-digit code sent to alex@example.com:" + ↓ +User: "482910" + ↓ +Bot β†’ POST /tg/email-confirm { chatId, code } + ↓ +Broker validates β†’ returns token β†’ bot connects + ↓ +"βœ… Connected to 2 meshes: alexis-team, dev-ops" +``` + +**Notes:** +- Auto-connects to ALL meshes the email is a member of +- Or shows picker if multiple meshes: "Which mesh? [1] alexis-team [2] dev-ops" +- Requires email sending (use existing Gmail MCP or Resend/Postmark) + +### D. Invite URL Detection + +**Flow:** +``` +User pastes in bot chat: +https://claudemesh.com/join/abc123 + ↓ +Bot detects URL pattern β†’ extracts invite token + ↓ +Bot: "Connect this chat to mesh 'alexis-team'? [Yes] [No]" + ↓ +User taps [Yes] + ↓ +Bot β†’ POST /tg/join-invite { chatId, inviteToken } + ↓ +Broker: validates invite β†’ creates member β†’ returns connect token + ↓ +Bot connects β†’ "βœ… Joined and connected!" +``` + +**Also handles:** +- `claudemesh join` URLs: `https://claudemesh.com/join/` +- Direct invite tokens pasted as text + +--- + +## Bot Commands (full list) + +| Command | Description | +|---|---| +| `/start ` | Connect via deep link token | +| `/connect` | Start email verification flow | +| `/disconnect` | Disconnect this chat from mesh | +| `/meshes` | List connected meshes | +| `/peers` | List online peers in connected mesh | +| `/dm ` | DM a specific peer (shows picker if ambiguous) | +| `/broadcast ` | Message all peers | +| `/group @name ` | Message a group | +| `/file ` | Download a mesh file | +| `/status` | Bridge connection status | +| `/help` | Show all commands | + +For chats connected to multiple meshes, prefix with mesh slug: +``` +/dm alexis-team:Mou hello +/peers dev-ops +``` + +--- + +## WS Pool + +The bridge maintains a pool of WS connections, one per unique mesh: + +```typescript +class BridgePool { + // meshId β†’ single WS connection shared by all chats in that mesh + private connections: Map; + + // chatId β†’ list of meshIds this chat is connected to + private chatMeshes: Map; + + // meshId β†’ list of chatIds to forward pushes to + private meshChats: Map; + + async addBridge(chatId: number, meshCreds: MeshCredentials): Promise; + async removeBridge(chatId: number, meshId: string): Promise; + + // On broker startup: load all active bridges from DB, connect WS pool + async boot(): Promise; +} +``` + +**Connection sharing:** If 5 Telegram chats are connected to the same mesh, they share ONE WS connection. Push messages from that mesh are fanned out to all 5 chats. + +**Scaling:** At 100 meshes Γ— 1 WS each = 100 connections. At 1000 meshes = 1000 connections. Bun handles this easily. If needed, shard by mesh ID across multiple bridge processes. + +--- + +## Security + +1. **Token expiry:** Connect tokens expire in 15 minutes +2. **Encryption at rest:** Member secret keys stored encrypted with `BROKER_ENCRYPTION_KEY` +3. **Chat authorization:** Only the chat that connected can disconnect +4. **Rate limiting:** Token generation limited to 10/hour per user +5. **Revocation:** Dashboard shows connected chats with "Disconnect" button +6. **No secret keys in transit:** Tokens contain encrypted keys, only the broker can decrypt + +--- + +## Message Routing + +**Telegram β†’ Mesh:** +``` +User sends text in Telegram chat + ↓ +Bot receives message + ↓ +Look up chatId β†’ meshId(s) in chatMeshes map + ↓ +For each mesh: + - Resolve @mention or /dm target β†’ pubkey + - Encrypt if direct, base64 if broadcast + - Send via mesh's WS connection +``` + +**Mesh β†’ Telegram:** +``` +WS push received on mesh connection + ↓ +Look up meshId β†’ chatId(s) in meshChats map + ↓ +For each chat: + - Decrypt message (session key) + - Resolve sender pubkey β†’ display name + avatar + - Format: "🧠 Mou: message text" + - bot.api.sendMessage(chatId, formatted) +``` + +**Files:** +- Telegram photo/document β†’ upload to MinIO β†’ broadcast file ID +- Mesh file ID mentioned β†’ `/file ` downloads via broker proxy + +--- + +## Implementation Order + +1. **DB migration** β€” `mesh.telegram_bridge` table +2. **Token endpoint** β€” `POST /tg/token` (JWT generation) +3. **Bridge module in broker** β€” Grammy bot + WS pool + routing +4. **Entry point D** β€” Invite URL detection (simplest, no dashboard needed) +5. **Entry point A** β€” Dashboard deep link (needs dashboard page) +6. **Entry point B** β€” CLI `claudemesh connect telegram` command +7. **Entry point C** β€” Email verification (needs email sending infra) + +Steps 1-4 are a single PR. Steps 5-7 are incremental. + +--- + +## Environment Variables + +``` +TELEGRAM_BOT_TOKEN= # Single bot for all users +TELEGRAM_ENABLED=true # Feature flag +``` + +No per-user env vars. Everything is in the DB. + +--- + +## Metrics + +``` +telegram_bridges_active gauge Active chat-mesh connections +telegram_messages_in_total counter Telegram β†’ mesh messages +telegram_messages_out_total counter Mesh β†’ Telegram messages +telegram_files_shared_total counter Files uploaded via Telegram +telegram_connect_total counter New connections by entry point (A/B/C/D) +``` diff --git a/packages/db/migrations/0016_telegram-bridge.sql b/packages/db/migrations/0016_telegram-bridge.sql new file mode 100644 index 0000000..3ab015c --- /dev/null +++ b/packages/db/migrations/0016_telegram-bridge.sql @@ -0,0 +1,16 @@ +CREATE TABLE IF NOT EXISTS mesh.telegram_bridge ( + id text PRIMARY KEY NOT NULL, + chat_id bigint NOT NULL, + chat_type text DEFAULT 'private', + chat_title text, + mesh_id text NOT NULL REFERENCES mesh.mesh(id) ON DELETE CASCADE ON UPDATE CASCADE, + member_id text REFERENCES mesh.member(id), + pubkey text NOT NULL, + secret_key text NOT NULL, + display_name text DEFAULT 'telegram', + active boolean DEFAULT true, + created_at timestamp DEFAULT now() NOT NULL, + disconnected_at timestamp +); + +CREATE UNIQUE INDEX IF NOT EXISTS telegram_bridge_chat_mesh_idx ON mesh.telegram_bridge (chat_id, mesh_id); diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 8cdb9f3..ccb6828 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -1,5 +1,6 @@ import { relations } from "drizzle-orm"; import { + bigint, boolean, index, integer, @@ -909,3 +910,51 @@ export const selectMeshVaultEntrySchema = createSelectSchema(meshVaultEntry); export const insertMeshVaultEntrySchema = createInsertSchema(meshVaultEntry); export type SelectMeshVaultEntry = typeof meshVaultEntry.$inferSelect; export type InsertMeshVaultEntry = typeof meshVaultEntry.$inferInsert; + +/** + * Telegram bridge connections. Each row represents a Telegram chat linked + * to a mesh via a bot-managed keypair. The bot authenticates to the broker + * as a virtual peer using the ed25519 keypair stored here, relaying + * messages bidirectionally between Telegram and the mesh. + */ +export const telegramBridge = meshSchema.table( + "telegram_bridge", + { + id: text().primaryKey().notNull().$defaultFn(generateId), + /** Telegram chat ID (can be negative for groups). */ + chatId: bigint({ mode: "bigint" }).notNull(), + chatType: text().default("private"), + chatTitle: text(), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + memberId: text().references(() => meshMember.id), + /** ed25519 public key (hex) β€” the virtual peer identity on the mesh. */ + pubkey: text().notNull(), + /** ed25519 secret key (hex) β€” encrypted at rest. */ + secretKey: text().notNull(), + displayName: text().default("telegram"), + active: boolean().default(true), + createdAt: timestamp().defaultNow().notNull(), + disconnectedAt: timestamp(), + }, + (table) => [ + uniqueIndex("telegram_bridge_chat_mesh_idx").on(table.chatId, table.meshId), + ], +); + +export const telegramBridgeRelations = relations(telegramBridge, ({ one }) => ({ + mesh: one(mesh, { + fields: [telegramBridge.meshId], + references: [mesh.id], + }), + member: one(meshMember, { + fields: [telegramBridge.memberId], + references: [meshMember.id], + }), +})); + +export const selectTelegramBridgeSchema = createSelectSchema(telegramBridge); +export const insertTelegramBridgeSchema = createInsertSchema(telegramBridge); +export type SelectTelegramBridge = typeof telegramBridge.$inferSelect; +export type InsertTelegramBridge = typeof telegramBridge.$inferInsert;