diff --git a/apps/broker/src/index.ts b/apps/broker/src/index.ts index f6d2508..292bf85 100644 --- a/apps/broker/src/index.ts +++ b/apps/broker/src/index.ts @@ -18,7 +18,8 @@ import { WebSocketServer, type WebSocket } from "ws"; import { and, eq, isNull, sql } from "drizzle-orm"; import { env } from "./env"; import { db } from "./db"; -import { mesh, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; +import { mesh, meshMember, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh"; +import { user } from "@turbostarter/db/schema/auth"; import { handleCliSync, type CliSyncRequest } from "./cli-sync"; import { updateMemberProfile, listMeshMembers, updateMeshSettings } from "./member-api"; import { @@ -4153,6 +4154,73 @@ function main(): void { }, tgBotToken, "wss://ic.claudemesh.com/ws", + // lookupMeshesByEmail: find all meshes a user belongs to by their email + async (email) => { + const users = await db.select({ id: user.id }).from(user).where(eq(user.email, email)).limit(1); + if (users.length === 0) return []; + const userId = users[0]!.id; + const members = await db.select({ + memberId: meshMember.id, + meshId: meshMember.meshId, + pubkey: meshMember.pubkey, + secretKey: meshMember.secretKey, + }).from(meshMember).where(and(eq(meshMember.dashboardUserId, userId), isNull(meshMember.revokedAt))); + const results = []; + for (const m of members) { + if (!m.pubkey || !m.secretKey) continue; + const meshRows = await db.select({ slug: mesh.slug }).from(mesh).where(eq(mesh.id, m.meshId)).limit(1); + results.push({ + userId, + meshId: m.meshId, + meshSlug: meshRows[0]?.slug ?? m.meshId.slice(0, 8), + memberId: m.memberId, + pubkey: m.pubkey, + secretKey: m.secretKey, + }); + } + return results; + }, + // sendVerificationEmail: send 6-digit code via Resend/Postmark + async (email, code) => { + const apiKey = process.env.RESEND_API_KEY ?? process.env.POSTMARK_API_KEY; + const fromAddr = process.env.EMAIL_FROM ?? "noreply@claudemesh.com"; + if (!apiKey) { + log.warn("no email API key configured (RESEND_API_KEY or POSTMARK_API_KEY)"); + return false; + } + try { + if (process.env.RESEND_API_KEY) { + const res = await fetch("https://api.resend.com/emails", { + method: "POST", + headers: { "Content-Type": "application/json", Authorization: `Bearer ${apiKey}` }, + body: JSON.stringify({ + from: fromAddr, + to: email, + subject: `${code} — Claudemesh Telegram verification`, + text: `Your verification code is: ${code}\n\nThis code expires in 10 minutes.\n\nIf you didn't request this, ignore this email.`, + }), + signal: AbortSignal.timeout(10_000), + }); + return res.ok; + } else { + const res = await fetch("https://api.postmarkapp.com/email", { + method: "POST", + headers: { "Content-Type": "application/json", "X-Postmark-Server-Token": apiKey }, + body: JSON.stringify({ + From: fromAddr, + To: email, + Subject: `${code} — Claudemesh Telegram verification`, + TextBody: `Your verification code is: ${code}\n\nThis code expires in 10 minutes.\n\nIf you didn't request this, ignore this email.`, + }), + signal: AbortSignal.timeout(10_000), + }); + return res.ok; + } + } catch (e) { + log.error("email send failed", { error: e instanceof Error ? e.message : String(e) }); + return false; + } + }, ).then(() => log.info("telegram bridge started")) .catch(e => log.error("telegram bridge failed", { error: e instanceof Error ? e.message : String(e) })); } diff --git a/apps/broker/src/telegram-bridge.ts b/apps/broker/src/telegram-bridge.ts index 0a6dcc3..25f17f0 100644 --- a/apps/broker/src/telegram-bridge.ts +++ b/apps/broker/src/telegram-bridge.ts @@ -522,6 +522,15 @@ const pendingFiles = new Map< { fileId: string; fileName: string; meshId: string; caption: string } >(); +// Pending email verification state: chatId → { email, code, expiresAt, attempts } +const pendingVerifications = new Map< + number, + { email: string; code: string; expiresAt: number; attempts: number } +>(); + +// Conversation state: chatId → which input the bot is waiting for +const conversationState = new Map(); + /** Invite URL regex: https://claudemesh.com/join/ */ const INVITE_URL_RE = /https?:\/\/(?:www\.)?claudemesh\.com\/join\/([A-Za-z0-9_\-\.]+)/; @@ -658,6 +667,68 @@ function escapeMarkdown(s: string): string { // Bot command handlers // --------------------------------------------------------------------------- +// --------------------------------------------------------------------------- +// Email verification helpers +// --------------------------------------------------------------------------- + +function generateCode(): string { + return Math.floor(100000 + Math.random() * 900000).toString(); +} + +async function startEmailVerification( + ctx: any, + chatId: number, + email: string, + lookupMeshesByEmail: (email: string) => Promise, + sendVerificationEmail: (email: string, code: string) => Promise, +): Promise { + // Check if email exists in our system + const meshes = await lookupMeshesByEmail(email); + if (meshes.length === 0) { + conversationState.delete(chatId); + pendingVerifications.delete(chatId); + await ctx.reply( + "❌ No claudemesh account found for that email.\n\n" + + "Sign up at claudemesh.com first, or use a connect link:\n" + + "`claudemesh connect telegram`", + { parse_mode: "Markdown" }, + ); + return; + } + + const code = generateCode(); + const sent = await sendVerificationEmail(email, code); + if (!sent) { + conversationState.delete(chatId); + await ctx.reply("❌ Failed to send verification email. Try again later."); + return; + } + + pendingVerifications.set(chatId, { + email, + code, + expiresAt: Date.now() + 10 * 60_000, // 10 min + attempts: 0, + }); + conversationState.set(chatId, "awaiting_code"); + + const masked = email.replace(/(.{2})(.*)(@.*)/, "$1***$3"); + await ctx.reply( + `📬 Verification code sent to *${escapeMarkdown(masked)}*\n\nEnter the 6-digit code:`, + { parse_mode: "Markdown" }, + ); +} + +/** Result from looking up a user's meshes by email */ +export interface UserMeshInfo { + userId: string; + meshId: string; + meshSlug: string; + memberId: string; + pubkey: string; + secretKey: string; +} + function setupBotCommands( bot: Bot, botToken: string, @@ -672,6 +743,8 @@ function setupBotCommands( text: string, priority: string, ) => void, + lookupMeshesByEmail: (email: string) => Promise, + sendVerificationEmail: (email: string, code: string) => Promise, ): void { // --- /start --- bot.command("start", async (ctx) => { @@ -750,13 +823,21 @@ function setupBotCommands( } }); - // --- /connect (email flow stub) --- + // --- /connect (email verification flow) --- bot.command("connect", async (ctx) => { - console.log("[tg-bridge] /connect requested — email flow not implemented yet"); + const chatId = ctx.chat.id; + + // If they passed an email inline: /connect user@example.com + const emailArg = ctx.match?.trim(); + if (emailArg && emailArg.includes("@")) { + conversationState.set(chatId, "awaiting_code"); + await startEmailVerification(ctx, chatId, emailArg, lookupMeshesByEmail, sendVerificationEmail); + return; + } + + conversationState.set(chatId, "awaiting_email"); await ctx.reply( - "📧 Email verification is not implemented yet.\n\n" + - "Use a connect link from the dashboard or CLI instead:\n" + - "`claudemesh connect telegram`", + "📧 *Connect via email*\n\nEnter the email address you used to sign up on claudemesh.com:", { parse_mode: "Markdown" }, ); }); @@ -1261,12 +1342,116 @@ function setupBotCommands( await handleFileUpload(ctx, doc.file_id, doc.file_name ?? `telegram-file-${Date.now()}`, false); }); - // --- Default text handler: invite URL detection, @mentions, broadcast --- + // --- Default text handler: conversation state, invite URLs, @mentions, broadcast --- bot.on("message:text", async (ctx) => { const chatId = ctx.chat.id; const text = ctx.message.text; if (text.startsWith("/")) return; // Skip unknown commands + // --- Email verification conversation state --- + const state = conversationState.get(chatId); + + if (state === "awaiting_email") { + const email = text.trim().toLowerCase(); + if (!email.includes("@") || !email.includes(".")) { + await ctx.reply("That doesn't look like an email. Try again:"); + return; + } + await startEmailVerification(ctx, chatId, email, lookupMeshesByEmail, sendVerificationEmail); + return; + } + + if (state === "awaiting_code") { + const pending = pendingVerifications.get(chatId); + if (!pending) { + conversationState.delete(chatId); + await ctx.reply("Session expired. Type /connect to start again."); + return; + } + + // Check expiry + if (Date.now() > pending.expiresAt) { + pendingVerifications.delete(chatId); + conversationState.delete(chatId); + await ctx.reply("⏰ Code expired. Type /connect to get a new one."); + return; + } + + const inputCode = text.trim().replace(/\s/g, ""); + + // Check attempts + pending.attempts++; + if (pending.attempts > 5) { + pendingVerifications.delete(chatId); + conversationState.delete(chatId); + await ctx.reply("❌ Too many attempts. Type /connect to start again."); + return; + } + + if (inputCode !== pending.code) { + await ctx.reply(`❌ Wrong code. ${5 - pending.attempts} attempts left.`); + return; + } + + // Code correct — connect to all meshes for this email + pendingVerifications.delete(chatId); + conversationState.delete(chatId); + + const meshes = await lookupMeshesByEmail(pending.email); + if (meshes.length === 0) { + await ctx.reply("❌ No meshes found. The account may have been removed."); + return; + } + + const chatType = ctx.chat.type; + const chatTitle = + ctx.chat.type === "private" + ? (ctx.from?.first_name ?? "Private") + : ("title" in ctx.chat ? ctx.chat.title : null) ?? "Group"; + const displayName = `tg:${chatTitle}`; + + let connected = 0; + for (const m of meshes) { + try { + // Skip if already connected + const existing = chatMeshes.get(chatId); + if (existing?.includes(m.meshId)) { connected++; continue; } + + await saveBridge({ + chatId, + meshId: m.meshId, + memberId: m.memberId, + pubkey: m.pubkey, + secretKey: m.secretKey, + displayName, + chatType, + chatTitle, + }); + await ensureMeshConnection( + { meshId: m.meshId, memberId: m.memberId, pubkey: m.pubkey, secretKey: m.secretKey, displayName, brokerUrl }, + pushHandler, + ); + linkChatMesh(chatId, m.meshId); + connected++; + } catch (e) { + console.error(`[tg-bridge] /connect failed for mesh ${m.meshId.slice(0, 8)}:`, e); + } + } + + if (connected === 0) { + await ctx.reply("❌ Connection failed for all meshes."); + } else if (meshes.length === 1) { + await ctx.reply( + `✅ Connected to mesh *${escapeMarkdown(meshes[0]!.meshSlug)}*\\!`, + { parse_mode: "MarkdownV2" }, + ); + } else { + const names = meshes.map(m => m.meshSlug).join(", "); + await ctx.reply(`✅ Connected to ${connected} mesh(es): ${names}`); + } + return; + } + // --- Invite URL detection --- const inviteMatch = text.match(INVITE_URL_RE); if (inviteMatch) { @@ -1397,6 +1582,8 @@ export async function bootTelegramBridge( deactivateBridge: (chatId: number, meshId: string) => Promise, botToken: string, brokerUrl: string, + lookupMeshesByEmail?: (email: string) => Promise, + sendVerificationEmail?: (email: string, code: string) => Promise, ): Promise { await ensureSodium(); @@ -1464,6 +1651,10 @@ export async function bootTelegramBridge( } }, 5 * 60_000).unref(); + // Default stubs if email callbacks not provided + const emailLookup = lookupMeshesByEmail ?? (async () => []); + const emailSend = sendVerificationEmail ?? (async () => false); + // Wire up bot commands setupBotCommands( bot, @@ -1472,6 +1663,8 @@ export async function bootTelegramBridge( saveBridge, deactivateBridge, pushHandler, + emailLookup, + emailSend, ); // Start Grammy long-polling (fire-and-forget, must not crash broker)