feat(cli): v1.7.0 — terminal parity for SSE + members + mentions
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Three new verbs that wrap the v1.6.x REST surface:

  claudemesh topic tail <name>  → live SSE consumer with N-message backfill
  claudemesh member list        → mesh roster decorated with online state
  claudemesh notification list  → recent @-mentions of you across topics

Each command auto-mints a 5-minute read-only apikey via the WS
broker and revokes on exit, so users don't manage tokens. SSE
client uses fetch + ReadableStream so the bearer stays in the
Authorization header.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 20:02:29 +01:00
parent c31a591681
commit dd80d4e946
8 changed files with 494 additions and 3 deletions

View File

@@ -26,4 +26,7 @@ export { runWelcome } from "./welcome.js";
export { runHook } from "./hook.js";
export { runMcp } from "./mcp.js";
export { runSeedTestMesh } from "./seed-test-mesh.js";
export { runNotificationList } from "./notification.js";
export { runMemberList } from "./member.js";
export { runTopicTail } from "./topic-tail.js";
export { withMesh } from "./connect.js";

View File

@@ -0,0 +1,78 @@
/**
* `claudemesh member list` — every (non-revoked) member of the chosen
* mesh, decorated with online state. Distinct from `peer list`: peers
* shows live WS sessions, members shows roster.
*/
import { withRestKey } from "~/services/api/with-rest-key.js";
import { request } from "~/services/api/client.js";
import { render } from "~/ui/render.js";
import { bold, clay, dim, green, red, yellow } from "~/ui/styles.js";
import { EXIT } from "~/constants/exit-codes.js";
export interface MemberFlags {
mesh?: string;
json?: boolean;
/** Show only online members. */
online?: boolean;
}
interface MemberRow {
memberId: string;
pubkey: string;
displayName: string;
role: string;
isHuman: boolean;
joinedAt: string;
online: boolean;
status: string;
summary: string | null;
}
function statusGlyph(m: MemberRow): string {
if (!m.online) return dim("○");
if (m.status === "dnd") return red("●");
if (m.status === "working") return yellow("●");
return green("●");
}
export async function runMemberList(flags: MemberFlags): Promise<number> {
return withRestKey(
{ meshSlug: flags.mesh ?? null, purpose: "members" },
async ({ secret, meshSlug }) => {
const result = await request<{ members: MemberRow[] }>({
path: "/api/v1/members",
token: secret,
});
const filtered = flags.online
? result.members.filter((m) => m.online)
: result.members;
if (flags.json) {
console.log(JSON.stringify({ members: filtered }, null, 2));
return EXIT.SUCCESS;
}
if (filtered.length === 0) {
render.info(
dim(flags.online ? `no online members in ${meshSlug}.` : `no members in ${meshSlug}.`),
);
return EXIT.SUCCESS;
}
const onlineCount = result.members.filter((m) => m.online).length;
render.section(
`${clay(meshSlug)} members (${onlineCount}/${result.members.length} online)`,
);
for (const m of filtered) {
const tag = m.isHuman ? dim("human") : dim("bot");
const summary = m.summary ? `${dim(m.summary)}` : "";
process.stdout.write(
` ${statusGlyph(m)} ${bold(m.displayName)} ${tag} ${dim(m.role)} ${dim(m.pubkey.slice(0, 8))}${summary}\n`,
);
}
return EXIT.SUCCESS;
},
);
}

View File

@@ -0,0 +1,93 @@
/**
* `claudemesh notification list` — recent @-mentions of the viewer
* across topics in the chosen mesh. Server-side regex match over the
* v0.2.0 plaintext-base64 ciphertext; the v0.3.0 per-topic encryption
* cut will move this to a notification table populated at write time.
*/
import { withRestKey } from "~/services/api/with-rest-key.js";
import { request } from "~/services/api/client.js";
import { render } from "~/ui/render.js";
import { bold, clay, dim } from "~/ui/styles.js";
import { EXIT } from "~/constants/exit-codes.js";
export interface NotificationFlags {
mesh?: string;
json?: boolean;
since?: string;
}
interface NotificationRow {
id: string;
topicId: string;
topicName: string;
senderName: string;
senderPubkey: string;
ciphertext: string;
createdAt: string;
}
interface NotificationsResponse {
notifications: NotificationRow[];
since: string;
mentionedAs: string;
}
function decodeCiphertext(b64: string): string {
try {
return Buffer.from(b64, "base64").toString("utf-8");
} catch {
return "[decode failed]";
}
}
function fmtRelative(iso: string): string {
const ms = Date.now() - new Date(iso).getTime();
if (ms < 60_000) return "now";
if (ms < 3_600_000) return `${Math.floor(ms / 60_000)}m`;
if (ms < 86_400_000) return `${Math.floor(ms / 3_600_000)}h`;
return `${Math.floor(ms / 86_400_000)}d`;
}
export async function runNotificationList(flags: NotificationFlags): Promise<number> {
return withRestKey(
{ meshSlug: flags.mesh ?? null, purpose: "notifications" },
async ({ secret }) => {
const qs = flags.since ? `?since=${encodeURIComponent(flags.since)}` : "";
const result = await request<NotificationsResponse>({
path: `/api/v1/notifications${qs}`,
token: secret,
});
if (flags.json) {
const decoded = result.notifications.map((n) => ({
...n,
message: decodeCiphertext(n.ciphertext),
}));
console.log(JSON.stringify({ ...result, notifications: decoded }, null, 2));
return EXIT.SUCCESS;
}
if (result.notifications.length === 0) {
render.info(
dim(`no mentions of @${result.mentionedAs} since ${result.since}.`),
);
return EXIT.SUCCESS;
}
render.section(
`mentions of @${bold(result.mentionedAs)} (${result.notifications.length})`,
);
for (const n of result.notifications) {
const when = fmtRelative(n.createdAt);
const msg = decodeCiphertext(n.ciphertext).replace(/\s+/g, " ").trim();
const snippet = msg.length > 100 ? msg.slice(0, 97) + "…" : msg;
process.stdout.write(
` ${clay("#" + n.topicName)} ${dim(when)} ${bold(n.senderName)}\n`,
);
process.stdout.write(` ${snippet}\n`);
}
return EXIT.SUCCESS;
},
);
}

View File

@@ -0,0 +1,196 @@
/**
* `claudemesh topic tail <name>` — live SSE consumer of a topic stream.
* Prints the last N messages from /v1/topics/:name/messages, then opens
* the SSE firehose at /v1/topics/:name/stream and prints new messages
* as they arrive. Ctrl-C to exit.
*/
import { URLS } from "~/constants/urls.js";
import { withRestKey } from "~/services/api/with-rest-key.js";
import { request } from "~/services/api/client.js";
import { render } from "~/ui/render.js";
import { bold, clay, dim } from "~/ui/styles.js";
import { EXIT } from "~/constants/exit-codes.js";
export interface TopicTailFlags {
mesh?: string;
json?: boolean;
limit?: number | string;
/** Skip the initial backfill — only show forward messages. */
forwardOnly?: boolean;
}
interface TopicMessage {
id: string;
senderPubkey: string;
senderName: string;
nonce: string;
ciphertext: string;
createdAt: string;
}
interface HistoryResponse {
topic: string;
topicId: string;
messages: TopicMessage[];
}
function decodeCiphertext(b64: string): string {
try {
return Buffer.from(b64, "base64").toString("utf-8");
} catch {
return "[decode failed]";
}
}
function fmtTime(iso: string): string {
try {
return new Date(iso).toLocaleTimeString([], {
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
});
} catch {
return iso;
}
}
function printMessage(m: TopicMessage, json: boolean): void {
const text = decodeCiphertext(m.ciphertext);
if (json) {
console.log(JSON.stringify({ ...m, message: text }));
return;
}
process.stdout.write(
` ${dim(fmtTime(m.createdAt))} ${bold(m.senderName || m.senderPubkey.slice(0, 8))} ${text}\n`,
);
}
interface SseEvent {
event: string;
id?: string;
data: string;
}
async function* readSseStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
): AsyncGenerator<SseEvent> {
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buffer.indexOf("\n\n")) !== -1) {
const block = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
const ev: SseEvent = { event: "message", data: "" };
const dataLines: string[] = [];
for (const line of block.split("\n")) {
if (!line || line.startsWith(":")) continue;
const colon = line.indexOf(":");
if (colon < 0) continue;
const field = line.slice(0, colon);
const val = line.slice(colon + 1).replace(/^ /, "");
if (field === "event") ev.event = val;
else if (field === "id") ev.id = val;
else if (field === "data") dataLines.push(val);
}
ev.data = dataLines.join("\n");
yield ev;
}
}
}
export async function runTopicTail(name: string, flags: TopicTailFlags): Promise<number> {
if (!name) {
render.err("Usage: claudemesh topic tail <topic> [--limit N]");
return EXIT.INVALID_ARGS;
}
const cleanName = name.replace(/^#/, "");
const limit = flags.limit ? Number(flags.limit) : 20;
return withRestKey(
{
meshSlug: flags.mesh ?? null,
purpose: `tail-${cleanName}`,
capabilities: ["read"],
topicScopes: [cleanName],
},
async ({ secret, meshSlug }) => {
// 1. Backfill the most recent N messages so the user sees context
// when they tail an active topic.
if (!flags.forwardOnly && limit > 0) {
try {
const history = await request<HistoryResponse>({
path: `/api/v1/topics/${encodeURIComponent(cleanName)}/messages?limit=${limit}`,
token: secret,
});
if (!flags.json) {
render.section(
`${clay("#" + cleanName)} on ${dim(meshSlug)} — backfill ${history.messages.length}, then live`,
);
}
// History is newest-first; reverse for chronological display.
for (const m of history.messages.slice().reverse()) {
printMessage(m, flags.json ?? false);
}
} catch (err) {
render.warn(`backfill failed: ${(err as Error).message}`);
}
}
// 2. Open the SSE firehose. fetch + ReadableStream so the bearer
// stays in the Authorization header (no token-in-URL leak).
const url = `${URLS.API_BASE}/api/v1/topics/${encodeURIComponent(cleanName)}/stream`;
const ctl = new AbortController();
const onSig = () => ctl.abort();
process.once("SIGINT", onSig);
process.once("SIGTERM", onSig);
try {
const res = await fetch(url, {
headers: { Authorization: `Bearer ${secret}` },
signal: ctl.signal,
});
if (!res.ok || !res.body) {
render.err(`stream open failed: ${res.status}`);
return EXIT.INTERNAL_ERROR;
}
if (!flags.json) {
render.info(dim("tailing — Ctrl-C to exit"));
}
const reader = res.body.getReader() as ReadableStreamDefaultReader<Uint8Array>;
for await (const ev of readSseStream(reader)) {
if (ev.event === "ready" || ev.event === "heartbeat") continue;
if (ev.event === "error") {
try {
const parsed = JSON.parse(ev.data) as { error?: string };
render.err(`stream error: ${parsed.error ?? "unknown"}`);
} catch {
render.err("stream error");
}
continue;
}
if (ev.event === "message") {
try {
const m = JSON.parse(ev.data) as TopicMessage;
printMessage(m, flags.json ?? false);
} catch {
// skip malformed
}
}
}
return EXIT.SUCCESS;
} catch (err) {
if (ctl.signal.aborted) return EXIT.SUCCESS; // user Ctrl-C'd
render.err(`tail aborted: ${(err as Error).message}`);
return EXIT.INTERNAL_ERROR;
} finally {
process.removeListener("SIGINT", onSig);
process.removeListener("SIGTERM", onSig);
}
},
);
}