feat(broker+api+cli): topic message reply-to threading (v0.3.1)
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled

Adds a reply_to_id column (self-FK on topic_message) plus end-to-end
plumbing so a message can mark itself as a reply to a previous one in
the same topic.

- Schema: 0027_topic_message_reply_to.sql adds reply_to_id with
  ON DELETE SET NULL + index for backlink lookup.
- Broker: appendTopicMessage validates parent shares the topic, writes
  reply_to_id; topicHistory + topic_history_response surface it; WS
  push envelope now carries senderMemberId, senderName, topic name,
  reply_to_id, and message_id so recipients have everything they need
  to reply without a follow-up query.
- REST: POST /v1/messages accepts replyToId (validated server-side);
  GET /messages and SSE /stream emit it per row.
- CLI: \`topic post --reply-to <id|prefix>\` resolves prefixes against
  recent history; \`topic tail\` renders an "↳ in reply to <name>:
  <snippet>" line above replies and shows a copyable #shortid tag on
  every row.
- MCP push pipe: channel attributes now include from_pubkey,
  from_member_id, message_id, topic, reply_to_id — the recipient can
  thread a reply directly from the inbound notification.
- Skill + identity prompt updated to teach Claude how to use the new
  attributes for replies.

Bumped CLI to 1.9.0.
This commit is contained in:
Alejandro Gutiérrez
2026-05-02 21:58:21 +01:00
parent d871988084
commit 038a5b5bf7
13 changed files with 273 additions and 19 deletions

View File

@@ -838,6 +838,13 @@ export async function appendTopicMessage(args: {
senderSessionPubkey?: string;
nonce: string;
ciphertext: string;
bodyVersion?: number;
/**
* Optional id of the parent topic message this one replies to. Server
* verifies the parent exists and lives in the same topic; otherwise
* silently drops the reference (treated as a top-level post).
*/
replyToId?: string;
/**
* Optional client-extracted mention list (lowercased display names
* without the leading @). Required once per-topic encryption lands —
@@ -846,6 +853,17 @@ export async function appendTopicMessage(args: {
*/
mentions?: string[];
}): Promise<string> {
let validatedReplyTo: string | null = null;
if (args.replyToId) {
const [parent] = await db
.select({ id: meshTopicMessage.id, topicId: meshTopicMessage.topicId })
.from(meshTopicMessage)
.where(eq(meshTopicMessage.id, args.replyToId));
if (parent && parent.topicId === args.topicId) {
validatedReplyTo = parent.id;
}
}
const [row] = await db
.insert(meshTopicMessage)
.values({
@@ -854,6 +872,8 @@ export async function appendTopicMessage(args: {
senderSessionPubkey: args.senderSessionPubkey ?? null,
nonce: args.nonce,
ciphertext: args.ciphertext,
bodyVersion: args.bodyVersion ?? 1,
replyToId: validatedReplyTo,
})
.returning({ id: meshTopicMessage.id });
if (!row) throw new Error("failed to append topic message");
@@ -958,8 +978,11 @@ export async function topicHistory(args: {
id: string;
senderMemberId: string;
senderPubkey: string;
senderName: string;
nonce: string;
ciphertext: string;
bodyVersion: number;
replyToId: string | null;
createdAt: Date;
}>
> {
@@ -971,13 +994,18 @@ export async function topicHistory(args: {
id: string;
sender_member_id: string;
sender_pubkey: string;
sender_name: string;
nonce: string;
ciphertext: string;
body_version: number;
reply_to_id: string | null;
created_at: Date;
}>(sql`
SELECT tm.id, tm.sender_member_id,
COALESCE(tm.sender_session_pubkey, m.peer_pubkey) AS sender_pubkey,
tm.nonce, tm.ciphertext, tm.created_at
m.display_name AS sender_name,
tm.nonce, tm.ciphertext, tm.body_version, tm.reply_to_id,
tm.created_at
FROM mesh.topic_message tm
JOIN mesh.member m ON m.id = tm.sender_member_id
WHERE tm.topic_id = ${args.topicId}
@@ -989,16 +1017,22 @@ export async function topicHistory(args: {
id: string;
sender_member_id: string;
sender_pubkey: string;
sender_name: string;
nonce: string;
ciphertext: string;
body_version: number;
reply_to_id: string | null;
created_at: Date;
}>;
return rows.map((r) => ({
id: r.id,
senderMemberId: r.sender_member_id,
senderPubkey: r.sender_pubkey,
senderName: r.sender_name,
nonce: r.nonce,
ciphertext: r.ciphertext,
bodyVersion: r.body_version ?? 1,
replyToId: r.reply_to_id,
createdAt: r.created_at instanceof Date ? r.created_at : new Date(r.created_at),
}));
}

View File

@@ -1944,18 +1944,23 @@ async function handleSend(
// persisted to topic_message in addition to the ephemeral queue, so
// humans (and opting-in agents) can fetch history on reconnect.
// Spec: .artifacts/specs/2026-05-02-v0.2.0-scope.md
let persistedTopicMessageId: string | null = null;
if (msg.targetSpec.startsWith("#")) {
const topicId = msg.targetSpec.slice(1);
void appendTopicMessage({
topicId,
senderMemberId: conn.memberId,
senderSessionPubkey: conn.sessionPubkey ?? undefined,
nonce: msg.nonce,
ciphertext: msg.ciphertext,
mentions: msg.mentions,
}).catch((e) =>
log.warn("appendTopicMessage failed", { topic_id: topicId, err: String(e) }),
);
try {
persistedTopicMessageId = await appendTopicMessage({
topicId,
senderMemberId: conn.memberId,
senderSessionPubkey: conn.sessionPubkey ?? undefined,
nonce: msg.nonce,
ciphertext: msg.ciphertext,
bodyVersion: msg.bodyVersion ?? 1,
replyToId: msg.replyToId,
mentions: msg.mentions,
});
} catch (e) {
log.warn("appendTopicMessage failed", { topic_id: topicId, err: String(e) });
}
}
void audit(conn.meshId, "message_sent", conn.memberId, conn.displayName, {
@@ -1987,15 +1992,29 @@ async function handleSend(
const isMulticast = isBroadcast || !!groupName;
// Build the push envelope once (reused for all recipients).
const isTopicTarget = msg.targetSpec.startsWith("#");
let topicName: string | undefined;
if (isTopicTarget) {
const topicId = msg.targetSpec.slice(1);
const [topicRow] = await db
.select({ name: meshTopic.name })
.from(meshTopic)
.where(eq(meshTopic.id, topicId));
if (topicRow) topicName = topicRow.name;
}
const pushEnvelope: WSPushMessage = {
type: "push",
messageId,
messageId: persistedTopicMessageId ?? messageId,
meshId: conn.meshId,
senderPubkey: conn.sessionPubkey ?? conn.memberPubkey,
senderMemberId: conn.memberId,
senderName: conn.displayName,
priority: msg.priority,
nonce: msg.nonce,
ciphertext: msg.ciphertext,
createdAt: new Date().toISOString(),
...(topicName ? { topic: topicName } : {}),
...(msg.replyToId ? { replyToId: msg.replyToId } : {}),
...(subtype ? { subtype } : {}),
};
@@ -2449,8 +2468,12 @@ function handleConnection(ws: WebSocket): void {
messages: history.map((h) => ({
id: h.id,
senderPubkey: h.senderPubkey,
senderMemberId: h.senderMemberId,
senderName: h.senderName,
nonce: h.nonce,
ciphertext: h.ciphertext,
bodyVersion: h.bodyVersion,
...(h.replyToId ? { replyToId: h.replyToId } : {}),
createdAt: h.createdAt.toISOString(),
})),
...(_reqId ? { _reqId } : {}),

View File

@@ -106,6 +106,10 @@ export interface WSSendMessage {
* the body when this is absent.
*/
mentions?: string[];
/** Optional id of a previous topic message this one replies to.
* Server validates same-topic membership; FK is set null if parent
* later disappears. Ignored for non-topic targets. */
replyToId?: string;
}
/** Broker → client: an envelope addressed to this peer. */
@@ -114,6 +118,17 @@ export interface WSPushMessage {
messageId: string;
meshId: string;
senderPubkey: string;
/** Stable mesh.member id of the sender — survives display-name changes,
* use this as the canonical reply target when set. Optional for
* legacy/non-topic broker paths that haven't been wired yet. */
senderMemberId?: string;
/** Sender's current display name as a convenience for renderers. */
senderName?: string;
/** Topic name when the push originates from a topic post (vs DM). */
topic?: string;
/** Server-side message id of the parent message when this push is a
* reply, so the recipient can render thread context and re-thread. */
replyToId?: string;
priority: Priority;
nonce: string;
ciphertext: string;
@@ -345,8 +360,12 @@ export interface WSTopicHistoryResponseMessage {
messages: Array<{
id: string;
senderPubkey: string;
senderMemberId?: string;
senderName?: string;
nonce: string;
ciphertext: string;
bodyVersion?: number;
replyToId?: string | null;
createdAt: string;
}>;
_reqId?: string;