diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index 05534ac..e8ff312 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -85,6 +85,9 @@ import { } from "./broker"; import * as serviceManager from "./service-manager"; import { ensureBucket, meshBucketName, minioClient } from "./minio"; +import { bootTelegramBridge } from "./telegram-bridge"; +import { generateTelegramConnectToken, generateDeepLink } from "./telegram-token"; +import { telegramBridge } from "@turbostarter/db/schema/mesh"; import { qdrant, meshCollectionName, ensureCollection } from "./qdrant"; import { neo4jDriver, meshDbName, ensureDatabase } from "./neo4j-client"; import type { @@ -627,6 +630,36 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void { return; } + // Telegram connect token + if (req.method === "POST" && req.url === "/tg/token") { + const chunks: Buffer[] = []; + req.on("data", (c: Buffer) => chunks.push(c)); + req.on("end", () => { + try { + const body = JSON.parse(Buffer.concat(chunks).toString()); + const { meshId: tgMeshId, memberId: tgMemberId, pubkey: tgPubkey, secretKey: tgSecretKey } = body; + if (!tgMeshId || !tgMemberId || !tgPubkey || !tgSecretKey) { + writeJson(res, 400, { error: "meshId, memberId, pubkey, secretKey required" }); + return; + } + const encKey = process.env.BROKER_ENCRYPTION_KEY ?? env.BROKER_ENCRYPTION_KEY; + if (!encKey) { writeJson(res, 500, { error: "broker not configured" }); return; } + db.select({ slug: mesh.slug }).from(mesh).where(eq(mesh.id, tgMeshId)).limit(1).then(rows => { + const meshSlug = rows[0]?.slug ?? tgMeshId; + const token = generateTelegramConnectToken( + { meshId: tgMeshId, meshSlug, memberId: tgMemberId, pubkey: tgPubkey, secretKey: tgSecretKey, createdBy: tgMemberId }, + encKey, + ); + const botUsername = process.env.TELEGRAM_BOT_USERNAME ?? "claudemesh_bot"; + const deepLink = generateDeepLink(token, botUsername); + writeJson(res, 200, { token, deepLink }); + log.info("tg/token", { route: "POST /tg/token", mesh_id: tgMeshId, latency_ms: Date.now() - started }); + }).catch(() => writeJson(res, 500, { error: "token generation failed" })); + } catch { writeJson(res, 400, { error: "invalid JSON" }); } + }); + return; + } + // Member profile API const memberPatchMatch = req.method === "PATCH" && req.url?.match(/^\/mesh\/([^/]+)\/member\/([^/]+)$/); if (memberPatchMatch) { @@ -4070,6 +4103,45 @@ function main(): void { }), ); + // Boot Telegram bridge if token configured + const tgBotToken = process.env.TELEGRAM_BOT_TOKEN; + if (tgBotToken) { + bootTelegramBridge( + async () => { + const rows = await db.select({ + chatId: telegramBridge.chatId, + meshId: telegramBridge.meshId, + memberId: telegramBridge.memberId, + pubkey: telegramBridge.pubkey, + secretKey: telegramBridge.secretKey, + displayName: telegramBridge.displayName, + chatType: telegramBridge.chatType, + chatTitle: telegramBridge.chatTitle, + }).from(telegramBridge).where(eq(telegramBridge.active, true)); + return rows.map(r => ({ ...r, chatId: Number(r.chatId) })); + }, + async (row) => { + await db.insert(telegramBridge).values({ + chatId: BigInt(row.chatId) as any, + meshId: row.meshId, + memberId: row.memberId, + pubkey: row.pubkey, + secretKey: row.secretKey, + displayName: row.displayName, + chatType: row.chatType, + chatTitle: row.chatTitle ?? null, + }); + }, + async (chatId, meshId) => { + await db.update(telegramBridge).set({ active: false, disconnectedAt: new Date() }) + .where(and(eq(telegramBridge.chatId, BigInt(chatId) as any), eq(telegramBridge.meshId, meshId))); + }, + tgBotToken, + "wss://ic.claudemesh.com/ws", + ).then(() => log.info("telegram bridge started")) + .catch(e => log.error("telegram bridge failed", { error: e instanceof Error ? e.message : String(e) })); + } + const shutdown = async (signal: string): Promise => { log.info("shutdown signal", { signal }); clearInterval(pingInterval);