From e758205eb85c7d8a80d3e61c871d8d9cbb5be0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20Guti=C3=A9rrez?= <35082514+alezmad@users.noreply.github.com> Date: Sat, 4 Apr 2026 21:28:24 +0100 Subject: [PATCH] feat(db): runtime presence + message queue + pending_status tables for broker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - mesh.presence: live WS connection tracking (memberId, sessionId, pid, cwd, status, statusSource, statusUpdatedAt, connectedAt, lastPingAt, disconnectedAt). Persisted so broker can resume after restart. - mesh.message_queue: E2E-encrypted envelopes awaiting delivery (meshId, senderMemberId, targetSpec, priority, nonce, ciphertext, expiresAt). Broker routes ciphertext only — crypto happens client-side. - mesh.pending_status: first-turn race catcher keyed by (pid, cwd). No FK to member (member doesn't exist yet when hook fires pre-register). - Enums: presence_status, presence_status_source, message_priority. - Relations: mesh→messageQueue, member→presences+sentMessages, presence→member, messageQueue→mesh+sender. - Cascade chain: mesh → member → presence + messageQueue (no orphans). Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/db/src/schema/mesh.ts | 119 ++++++++++++++++++++++++++++++++- 1 file changed, 118 insertions(+), 1 deletion(-) diff --git a/packages/db/src/schema/mesh.ts b/packages/db/src/schema/mesh.ts index 2a4214f..1898872 100644 --- a/packages/db/src/schema/mesh.ts +++ b/packages/db/src/schema/mesh.ts @@ -35,6 +35,24 @@ export const meshTierEnum = schema.enum("tier", [ export const meshRoleEnum = schema.enum("role", ["admin", "member"]); +export const presenceStatusEnum = schema.enum("presence_status", [ + "idle", + "working", + "dnd", +]); + +export const presenceStatusSourceEnum = schema.enum("presence_status_source", [ + "hook", + "manual", + "jsonl", +]); + +export const messagePriorityEnum = schema.enum("message_priority", [ + "now", + "next", + "low", +]); + /** * A mesh is a peer group of Claude Code sessions that can talk to each * other via the broker. Ownership is tied to a user; transport/tier @@ -113,6 +131,72 @@ export const auditLog = schema.table("audit_log", { createdAt: timestamp().defaultNow().notNull(), }); +/** + * Live WebSocket connection tracking for a member. One presence row per + * active Claude Code session: created on connect, updated on every + * heartbeat/hook signal, closed out (disconnectedAt set) on disconnect. + * Persisted so the broker can resume state after a restart. + */ +export const presence = schema.table("presence", { + id: text().primaryKey().notNull().$defaultFn(generateId), + memberId: text() + .references(() => member.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + sessionId: text().notNull(), + pid: integer().notNull(), + cwd: text().notNull(), + status: presenceStatusEnum().notNull().default("idle"), + statusSource: presenceStatusSourceEnum().notNull().default("jsonl"), + statusUpdatedAt: timestamp().defaultNow().notNull(), + connectedAt: timestamp().defaultNow().notNull(), + lastPingAt: timestamp().defaultNow().notNull(), + disconnectedAt: timestamp(), +}); + +/** + * In-flight E2E-encrypted message envelopes awaiting delivery. + * The broker only ever sees ciphertext + routing metadata — the + * nonce+ciphertext pair is sealed with libsodium client-side. + * + * `targetSpec` is free-form text and can address: a specific member + * pubkey (direct message), a channel (`#general`), a tag (`tag:admins`), + * or a broadcast (`*`). Resolution happens in broker logic, not SQL. + */ +export const messageQueue = schema.table("message_queue", { + id: text().primaryKey().notNull().$defaultFn(generateId), + meshId: text() + .references(() => mesh.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + senderMemberId: text() + .references(() => member.id, { onDelete: "cascade", onUpdate: "cascade" }) + .notNull(), + targetSpec: text().notNull(), + priority: messagePriorityEnum().notNull().default("next"), + nonce: text().notNull(), + ciphertext: text().notNull(), + createdAt: timestamp().defaultNow().notNull(), + deliveredAt: timestamp(), + expiresAt: timestamp(), +}); + +/** + * First-turn race handler: hook signals that fire BEFORE the peer has + * a registered mesh.member row get stashed here keyed by (pid, cwd), + * then applied to the member's presence on register. Swept after TTL. + * + * Intentionally NOT linked to member/mesh via FK — the whole point is + * that no member row exists yet when the hook fires. + */ +export const pendingStatus = schema.table("pending_status", { + id: text().primaryKey().notNull().$defaultFn(generateId), + pid: integer().notNull(), + cwd: text().notNull(), + status: text().notNull(), + statusSource: text().notNull(), + createdAt: timestamp().defaultNow().notNull(), + appliedAt: timestamp(), +}); + export const meshRelations = relations(mesh, ({ one, many }) => ({ owner: one(user, { fields: [mesh.ownerUserId], @@ -121,9 +205,10 @@ export const meshRelations = relations(mesh, ({ one, many }) => ({ members: many(member), invites: many(invite), auditLogs: many(auditLog), + messageQueue: many(messageQueue), })); -export const memberRelations = relations(member, ({ one }) => ({ +export const memberRelations = relations(member, ({ one, many }) => ({ mesh: one(mesh, { fields: [member.meshId], references: [mesh.id], @@ -132,6 +217,26 @@ export const memberRelations = relations(member, ({ one }) => ({ fields: [member.userId], references: [user.id], }), + presences: many(presence), + sentMessages: many(messageQueue), +})); + +export const presenceRelations = relations(presence, ({ one }) => ({ + member: one(member, { + fields: [presence.memberId], + references: [member.id], + }), +})); + +export const messageQueueRelations = relations(messageQueue, ({ one }) => ({ + mesh: one(mesh, { + fields: [messageQueue.meshId], + references: [mesh.id], + }), + sender: one(member, { + fields: [messageQueue.senderMemberId], + references: [member.id], + }), })); export const inviteRelations = relations(invite, ({ one }) => ({ @@ -160,6 +265,12 @@ export const selectInviteSchema = createSelectSchema(invite); export const insertInviteSchema = createInsertSchema(invite); export const selectAuditLogSchema = createSelectSchema(auditLog); export const insertAuditLogSchema = createInsertSchema(auditLog); +export const selectPresenceSchema = createSelectSchema(presence); +export const insertPresenceSchema = createInsertSchema(presence); +export const selectMessageQueueSchema = createSelectSchema(messageQueue); +export const insertMessageQueueSchema = createInsertSchema(messageQueue); +export const selectPendingStatusSchema = createSelectSchema(pendingStatus); +export const insertPendingStatusSchema = createInsertSchema(pendingStatus); export type SelectMesh = typeof mesh.$inferSelect; export type InsertMesh = typeof mesh.$inferInsert; @@ -169,3 +280,9 @@ export type SelectInvite = typeof invite.$inferSelect; export type InsertInvite = typeof invite.$inferInsert; export type SelectAuditLog = typeof auditLog.$inferSelect; export type InsertAuditLog = typeof auditLog.$inferInsert; +export type SelectPresence = typeof presence.$inferSelect; +export type InsertPresence = typeof presence.$inferInsert; +export type SelectMessageQueue = typeof messageQueue.$inferSelect; +export type InsertMessageQueue = typeof messageQueue.$inferInsert; +export type SelectPendingStatus = typeof pendingStatus.$inferSelect; +export type InsertPendingStatus = typeof pendingStatus.$inferInsert;