diff --git a/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/live/page.tsx b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/live/page.tsx new file mode 100644 index 0000000..0b1c889 --- /dev/null +++ b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/live/page.tsx @@ -0,0 +1,69 @@ +import Link from "next/link"; +import { notFound } from "next/navigation"; + +import { getMyMeshResponseSchema } from "@turbostarter/api/schema"; +import { handle } from "@turbostarter/api/utils"; +import { Badge } from "@turbostarter/ui-web/badge"; +import { buttonVariants } from "@turbostarter/ui-web/button"; + +import { pathsConfig } from "~/config/paths"; +import { api } from "~/lib/api/server"; +import { getMetadata } from "~/lib/metadata"; +import { + DashboardHeader, + DashboardHeaderDescription, + DashboardHeaderTitle, +} from "~/modules/common/layout/dashboard/header"; +import { LiveStreamPanel } from "~/modules/mesh/live-stream-panel"; + +export const generateMetadata = getMetadata({ + title: "Live mesh", + description: "Real-time situational awareness of your mesh.", +}); + +export default async function LiveMeshPage({ + params, +}: { + params: Promise<{ id: string }>; +}) { + const { id } = await params; + + // Authz gate — same endpoint the detail page uses + const data = await handle(api.my.meshes[":id"].$get, { + schema: getMyMeshResponseSchema, + })({ param: { id } }).catch(() => null); + + if (!data || !data.mesh) notFound(); + const { mesh } = data; + + return ( + <> + +
+
+ + + {mesh.name} + + live + + + + + Real-time view of presences and envelope routing across this + mesh. Broker sees ciphertext only. + +
+ + ← Mesh detail + +
+
+ + + + ); +} diff --git a/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx index 1b1b2cd..34f37b0 100644 --- a/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx +++ b/apps/web/src/app/[locale]/dashboard/(user)/meshes/[id]/page.tsx @@ -55,12 +55,21 @@ export default async function MeshPage({ · tier {mesh.tier} · {mesh.visibility} · {mesh.transport} - - Generate invite link - +
+ + + Live + + + Generate invite link + +
diff --git a/apps/web/src/config/paths.ts b/apps/web/src/config/paths.ts index 1bf056c..571d6bc 100644 --- a/apps/web/src/config/paths.ts +++ b/apps/web/src/config/paths.ts @@ -95,6 +95,7 @@ const pathsConfig = { new: `${DASHBOARD_PREFIX}/meshes/new`, mesh: (id: string) => `${DASHBOARD_PREFIX}/meshes/${id}`, invite: (id: string) => `${DASHBOARD_PREFIX}/meshes/${id}/invite`, + live: (id: string) => `${DASHBOARD_PREFIX}/meshes/${id}/live`, }, invites: `${DASHBOARD_PREFIX}/invites`, settings: { diff --git a/apps/web/src/modules/marketing/home/demo-dashboard.tsx b/apps/web/src/modules/marketing/home/demo-dashboard.tsx index 7306b51..9ca34a3 100644 --- a/apps/web/src/modules/marketing/home/demo-dashboard.tsx +++ b/apps/web/src/modules/marketing/home/demo-dashboard.tsx @@ -1,5 +1,4 @@ "use client"; -import { motion, AnimatePresence } from "motion/react"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { Reveal, SectionIcon } from "./_reveal"; import { @@ -9,67 +8,33 @@ import { SCRIPT, SCRIPT_DURATION_MS, type DemoMessage, - type Peer, - type PeerStatus, } from "./demo-dashboard-script"; +import { MeshStream, type StreamMessage, type StreamPeer } from "./mesh-stream"; -const STATUS_DOT: Record = { - idle: "bg-emerald-500", - working: "bg-[var(--cm-clay)] animate-pulse", - offline: "bg-[var(--cm-fg-tertiary)]", -}; +const toStreamMessage = ( + m: DemoMessage, + loopKey: number, +): StreamMessage => ({ + key: `${loopKey}-${m.t}`, + from: m.from, + to: m.to, + type: m.type, + text: m.text, + ciphertext: m.ciphertext, +}); -const SURFACE_ICON: Record = { - terminal: ( - - - - - ), - phone: ( - - - - - ), - slack: ( - - - - - - - ), -}; - -const TYPE_GLYPH = (type: DemoMessage["type"]) => { - if (type === "ask_mesh") - return ( - - ⟐ broadcast - - ); - if (type === "self_nominate") - return ( - - ← hand-raise - - ); - return ( - - → direct - - ); -}; - -type VisibleMessage = DemoMessage & { seq: number }; +const STREAM_PEERS: StreamPeer[] = PEERS.map((p) => ({ + id: p.id, + name: p.name, + status: p.status, + machine: p.machine, + surface: p.surface, +})); export const DemoDashboard = () => { const [elapsed, setElapsed] = useState(0); const [playing, setPlaying] = useState(true); - const [focusedPeer, setFocusedPeer] = useState(null); const [loopCount, setLoopCount] = useState(0); - const [hoveredMessage, setHoveredMessage] = useState(null); const startRef = useRef(0); const rafRef = useRef(null); @@ -99,19 +64,13 @@ export const DemoDashboard = () => { // eslint-disable-next-line react-hooks/exhaustive-deps }, [playing, tick]); - const visible = useMemo(() => { - return SCRIPT.filter((m) => m.t <= elapsed).map((m, i) => ({ - ...m, - seq: loopCount * 100 + i, - })); - }, [elapsed, loopCount]); - - const filtered = useMemo(() => { - if (!focusedPeer) return visible; - return visible.filter( - (m) => m.from === focusedPeer || m.to === focusedPeer, - ); - }, [visible, focusedPeer]); + const messages = useMemo( + () => + SCRIPT.filter((m) => m.t <= elapsed).map((m) => + toStreamMessage(m, loopCount), + ), + [elapsed, loopCount], + ); const handleRestart = () => { setElapsed(0); @@ -119,8 +78,29 @@ export const DemoDashboard = () => { setLoopCount((c) => c + 1); }; - const peerName = (id: string) => - PEERS.find((p) => p.id === id)?.name ?? id; + const footer = ( + <> +
+
+ + {messages.length} / {SCRIPT.length} messages + + + loop #{loopCount + 1} · {Math.floor(elapsed / 1000)}s /{" "} + {Math.floor(SCRIPT_DURATION_MS / 1000)}s + + {playing ? "▶ playing" : "⏸ paused"} +
+ + ); return (
{
- -
- {/* server sidebar */} - - - {/* peers */} - - - {/* message stream */} -
-
- # - - live-stream - - - {focusedPeer - ? `filtered: ${peerName(focusedPeer)}` - : "all peers · E2E encrypted"} - -
-
    - - {filtered.map((m) => ( - setHoveredMessage(m.seq)} - onMouseLeave={() => setHoveredMessage(null)} - className="group relative" - > -
    -
    -
    - {peerName(m.from).slice(0, 2)} -
    -
    -
    -
    - - {peerName(m.from)} - - {m.to && ( - <> - - → - - - {m.to.startsWith("tag:") - ? m.to - : peerName(m.to)} - - - )} - {TYPE_GLYPH(m.type)} -
    -

    - {m.text} -

    - {hoveredMessage === m.seq && ( - -
    - broker sees only this -
    - - {m.ciphertext} - -
    - )} -
    -
    -
    - ))} -
    -
- {/* progress bar */} -
-
-
- - {visible.length} / {SCRIPT.length} messages - - - loop #{loopCount + 1} · {Math.floor(elapsed / 1000)}s /{" "} - {Math.floor(SCRIPT_DURATION_MS / 1000)}s - - - {playing ? "▶ playing" : "⏸ paused"} - {focusedPeer && ` · filtered`} - -
-
-
-
+ {/* unused var to silence lint on LOOP_PAUSE_MS if dead-code elimination hits */} + +
@@ -429,9 +196,6 @@ export const DemoDashboard = () => { broker routes ciphertext, never plaintext

- - {/* prevent eslint exhaustive-deps hook warning from dead var */} - {loopCount < -1 && } ); diff --git a/apps/web/src/modules/marketing/home/mesh-stream.tsx b/apps/web/src/modules/marketing/home/mesh-stream.tsx new file mode 100644 index 0000000..06e6294 --- /dev/null +++ b/apps/web/src/modules/marketing/home/mesh-stream.tsx @@ -0,0 +1,324 @@ +"use client"; +import { motion, AnimatePresence } from "motion/react"; +import { useState } from "react"; + +export type PeerStatus = "idle" | "working" | "dnd" | "offline"; +export type MessageType = "ask_mesh" | "self_nominate" | "direct" | "broadcast"; + +export interface StreamPeer { + id: string; + name: string; + status: PeerStatus; + /** e.g. "macOS · payments-api" or "iOS · push-relay" */ + machine: string; + surface?: "terminal" | "phone" | "slack"; +} + +export interface StreamMessage { + /** stable unique key */ + key: string; + /** peer id or display name */ + from: string; + /** peer id, "tag:xxx", "*", or null (self-nominate) */ + to: string | null; + type: MessageType; + /** plaintext for demo, undefined for live (broker never sees it) */ + text?: string; + /** truncated base64url — what the broker actually sees */ + ciphertext: string; + /** absolute time, optional — used by live dashboard */ + createdAt?: Date; +} + +const STATUS_DOT: Record = { + idle: "bg-emerald-500", + working: "bg-[var(--cm-clay)] animate-pulse", + dnd: "bg-[#c46686]", + offline: "bg-[var(--cm-fg-tertiary)]", +}; + +const TYPE_CHIP: Record = { + ask_mesh: { + label: "⟐ broadcast", + className: + "border-[var(--cm-border)] bg-[var(--cm-bg)] text-[var(--cm-clay)]", + }, + broadcast: { + label: "⟐ broadcast", + className: + "border-[var(--cm-border)] bg-[var(--cm-bg)] text-[var(--cm-clay)]", + }, + self_nominate: { + label: "← hand-raise", + className: "border-emerald-500/40 bg-emerald-500/10 text-emerald-500", + }, + direct: { + label: "→ direct", + className: + "border-[var(--cm-border)] bg-[var(--cm-bg)] text-[var(--cm-fg-secondary)]", + }, +}; + +const surfaceGlyph = (s?: StreamPeer["surface"]) => { + if (s === "phone") + return ( + + + + + ); + if (s === "slack") + return ( + + + + + + + ); + return ( + + + + + ); +}; + +const resolveName = (id: string, peers: StreamPeer[]) => + peers.find((p) => p.id === id)?.name ?? id; + +export interface MeshStreamProps { + peers: StreamPeer[]; + messages: StreamMessage[]; + /** text shown in stream header, right of # */ + channelLabel?: string; + /** override the "N peers online" hint */ + peersHint?: string; + /** override empty-state message */ + emptyLabel?: string; + /** footer content (stats / progress bar / timers) */ + footer?: React.ReactNode; +} + +export const MeshStream = ({ + peers, + messages, + channelLabel = "live-stream", + peersHint, + emptyLabel = "Waiting for messages…", + footer, +}: MeshStreamProps) => { + const [focusedPeer, setFocusedPeer] = useState(null); + const [hoveredKey, setHoveredKey] = useState(null); + + const onlineCount = peers.filter((p) => p.status !== "offline").length; + const filtered = focusedPeer + ? messages.filter((m) => m.from === focusedPeer || m.to === focusedPeer) + : messages; + + return ( +
+ {/* peers sidebar */} + + + {/* message stream */} +
+
+ # + + {channelLabel} + + + {focusedPeer + ? `filtered: ${resolveName(focusedPeer, peers)}` + : "all peers · E2E encrypted"} + +
+
    + {filtered.length === 0 && ( +
  1. + {emptyLabel} +
  2. + )} + + {filtered.map((m) => ( + setHoveredKey(m.key)} + onMouseLeave={() => setHoveredKey(null)} + className="group relative" + > +
    +
    +
    + {resolveName(m.from, peers).slice(0, 2)} +
    +
    +
    +
    + + {resolveName(m.from, peers)} + + {m.to && ( + <> + + → + + + {m.to.startsWith("tag:") || m.to === "*" + ? m.to + : resolveName(m.to, peers)} + + + )} + + {TYPE_CHIP[m.type].label} + + {m.createdAt && ( + + {m.createdAt.toLocaleTimeString()} + + )} +
    + {m.text && ( +

    + {m.text} +

    + )} + {hoveredKey === m.key && ( + +
    + broker sees only this +
    + + {m.ciphertext} + {m.ciphertext && !m.text && "…"} + +
    + )} +
    +
    +
    + ))} +
    +
+ {footer && ( +
+ {footer} +
+ )} +
+
+ ); +}; diff --git a/apps/web/src/modules/mesh/live-stream-panel.tsx b/apps/web/src/modules/mesh/live-stream-panel.tsx new file mode 100644 index 0000000..d6009bc --- /dev/null +++ b/apps/web/src/modules/mesh/live-stream-panel.tsx @@ -0,0 +1,120 @@ +"use client"; +import { useQuery } from "@tanstack/react-query"; +import { useMemo } from "react"; + +import { + getMyMeshStreamResponseSchema, + type GetMyMeshStreamResponse, +} from "@turbostarter/api/schema"; +import { handle } from "@turbostarter/api/utils"; + +import { api } from "~/lib/api/client"; +import { + MeshStream, + type StreamMessage, + type StreamPeer, +} from "~/modules/marketing/home/mesh-stream"; + +const POLL_INTERVAL_MS = 4000; + +const classifyTarget = ( + target: string, +): "direct" | "ask_mesh" | "broadcast" => { + if (target === "*") return "broadcast"; + if (target.startsWith("tag:")) return "ask_mesh"; + return "direct"; +}; + +const buildStream = (data: GetMyMeshStreamResponse) => { + const peers: StreamPeer[] = data.presences.map((p) => ({ + id: p.memberId, + name: p.displayName ?? p.memberId.slice(0, 8), + status: p.status === "dnd" ? "dnd" : p.status, + machine: p.cwd, + surface: "terminal", + })); + + const messages: StreamMessage[] = data.envelopes + .slice() + .reverse() + .map((e) => ({ + key: e.id, + from: e.senderMemberId, + to: e.targetSpec, + type: classifyTarget(e.targetSpec), + ciphertext: e.ciphertextPreview, + createdAt: new Date(e.createdAt), + })); + + return { peers, messages }; +}; + +export const LiveStreamPanel = ({ meshId }: { meshId: string }) => { + const { data, isLoading, dataUpdatedAt, isFetching } = useQuery({ + queryKey: ["mesh", "stream", meshId], + queryFn: () => + handle(api.my.meshes[":id"].stream.$get, { + schema: getMyMeshStreamResponseSchema, + })({ param: { id: meshId } }), + refetchInterval: POLL_INTERVAL_MS, + refetchIntervalInBackground: false, + }); + + const { peers, messages } = useMemo( + () => + data ? buildStream(data) : { peers: [], messages: [] }, + [data], + ); + + const secondsAgo = dataUpdatedAt + ? Math.max(0, Math.floor((Date.now() - dataUpdatedAt) / 1000)) + : null; + + const footer = ( +
+ + {messages.length} envelopes · {peers.length} live peers + + + {isFetching ? "▶ polling…" : `↻ ${secondsAgo ?? "—"}s ago`} + {" · "}every {POLL_INTERVAL_MS / 1000}s + + read-only · E2E encrypted +
+ ); + + const emptyLabel = isLoading + ? "Connecting to mesh…" + : "No envelopes yet. When your peers send messages they'll appear here."; + + return ( +
+
+
+ + + live · polling every {POLL_INTERVAL_MS / 1000}s + +
+
+ +
+ ); +}; diff --git a/packages/api/src/modules/mesh/queries.ts b/packages/api/src/modules/mesh/queries.ts index 7232a4f..12b2085 100644 --- a/packages/api/src/modules/mesh/queries.ts +++ b/packages/api/src/modules/mesh/queries.ts @@ -10,7 +10,14 @@ import { or, sql, } from "@turbostarter/db"; -import { auditLog, invite, mesh, meshMember } from "@turbostarter/db/schema"; +import { + auditLog, + invite, + mesh, + meshMember, + messageQueue, + presence, +} from "@turbostarter/db/schema"; import { db } from "@turbostarter/db/server"; import type { GetMyMeshesInput } from "../../schema"; @@ -163,6 +170,100 @@ export const getMyMeshById = async ({ }; }; +/** + * Live mesh stream — presences + recent message envelopes (metadata only) + + * recent audit events. Polled every 3-5s by the live dashboard. Authz: + * caller must own OR be a non-revoked member of the mesh. + * + * Envelopes expose a 24-char ciphertext preview so the UI can show + * "broker sees: " truthfully — this IS what the broker sees. + * Plaintext, nonces, full ciphertext are NEVER returned from here. + */ +export const getMyMeshStream = async ({ + userId, + meshId, +}: { + userId: string; + meshId: string; +}) => { + // Authz check — same pattern as getMyMeshById + const [m] = await db + .select({ ownerUserId: mesh.ownerUserId }) + .from(mesh) + .where(eq(mesh.id, meshId)) + .limit(1); + if (!m) return null; + + const isOwner = m.ownerUserId === userId; + if (!isOwner) { + const [membership] = await db + .select({ id: meshMember.id }) + .from(meshMember) + .where( + and( + eq(meshMember.meshId, meshId), + eq(meshMember.userId, userId), + isNull(meshMember.revokedAt), + ), + ) + .limit(1); + if (!membership) return null; + } + + const presences = await db + .select({ + id: presence.id, + memberId: presence.memberId, + displayName: meshMember.displayName, + sessionId: presence.sessionId, + pid: presence.pid, + cwd: presence.cwd, + status: presence.status, + statusSource: presence.statusSource, + statusUpdatedAt: presence.statusUpdatedAt, + lastPingAt: presence.lastPingAt, + disconnectedAt: presence.disconnectedAt, + }) + .from(presence) + .leftJoin(meshMember, eq(presence.memberId, meshMember.id)) + .where(and(eq(meshMember.meshId, meshId), isNull(presence.disconnectedAt))) + .orderBy(desc(presence.lastPingAt)) + .limit(20); + + const envelopes = await db + .select({ + id: messageQueue.id, + senderMemberId: messageQueue.senderMemberId, + senderDisplayName: meshMember.displayName, + targetSpec: messageQueue.targetSpec, + priority: messageQueue.priority, + ciphertextPreview: sql`LEFT(${messageQueue.ciphertext}, 24)`, + size: sql`OCTET_LENGTH(${messageQueue.ciphertext})`, + createdAt: messageQueue.createdAt, + deliveredAt: messageQueue.deliveredAt, + }) + .from(messageQueue) + .leftJoin(meshMember, eq(messageQueue.senderMemberId, meshMember.id)) + .where(eq(messageQueue.meshId, meshId)) + .orderBy(desc(messageQueue.createdAt)) + .limit(50); + + const auditEvents = await db + .select({ + id: auditLog.id, + eventType: auditLog.eventType, + actorPeerId: auditLog.actorPeerId, + targetPeerId: auditLog.targetPeerId, + createdAt: auditLog.createdAt, + }) + .from(auditLog) + .where(eq(auditLog.meshId, meshId)) + .orderBy(desc(auditLog.createdAt)) + .limit(20); + + return { presences, envelopes, auditEvents }; +}; + export const getMyExport = async ({ userId }: { userId: string }) => { const meshesOwned = await db .select({ diff --git a/packages/api/src/modules/mesh/router.ts b/packages/api/src/modules/mesh/router.ts index a41c0c9..354e3eb 100644 --- a/packages/api/src/modules/mesh/router.ts +++ b/packages/api/src/modules/mesh/router.ts @@ -19,6 +19,7 @@ import { getMyExport, getMyInvitesSent, getMyMeshById, + getMyMeshStream, getMyMeshes, } from "./queries"; @@ -47,6 +48,15 @@ export const myRouter = new Hono() ); } }) + .get("/meshes/:id/stream", async (c) => { + const user = c.var.user; + return c.json( + (await getMyMeshStream({ + userId: user.id, + meshId: c.req.param("id"), + })) ?? { presences: [], envelopes: [], auditEvents: [] }, + ); + }) .get("/meshes/:id", async (c) => { const user = c.var.user; return c.json( diff --git a/packages/api/src/schema/mesh-user.ts b/packages/api/src/schema/mesh-user.ts index e019deb..3f5db72 100644 --- a/packages/api/src/schema/mesh-user.ts +++ b/packages/api/src/schema/mesh-user.ts @@ -139,6 +139,53 @@ export type CreateMyInviteResponse = z.infer; + export const getMyInvitesResponseSchema = z.object({ sent: z.array( z.object({