Files
claudemesh/.artifacts/specs/2026-04-10-cli-v2-pass2-local-first-storage.md
Alejandro Gutiérrez ee12510ef1
Some checks failed
CI / Lint (push) Has been cancelled
CI / Typecheck (push) Has been cancelled
CI / Broker tests (Postgres) (push) Has been cancelled
CI / Docker build (linux/amd64) (push) Has been cancelled
refactor: rename cli-v2 → cli, archive legacy cli, plus broker-side grants + auto-migrate
- apps/cli/ is now the canonical CLI (was apps/cli-v2/).
- apps/cli/ legacy v0 archived as branch 'legacy-cli-archive' and tag
  'cli-v0-legacy-final' before deletion; git history preserves it too.
- .github/workflows/release-cli.yml paths updated.
- pnpm-lock.yaml regenerated.

Broker-side peer-grant enforcement (spec: 2026-04-15-per-peer-capabilities):
- 0020_peer-grants.sql adds peer_grants jsonb + GIN index on mesh.member.
- handleSend in broker fetches recipient grant maps once per send, drops
  messages silently when sender lacks the required capability.
- POST /cli/mesh/:slug/grants to update from CLI; broker_messages_dropped_by_grant_total metric.
- CLI grant/revoke/block now mirror to broker via syncToBroker.

Auto-migrate on broker startup:
- apps/broker/src/migrate.ts runs drizzle migrate with pg_advisory_lock
  before the HTTP server binds. Exits non-zero on failure so Coolify
  healthcheck fails closed.
- Dockerfile copies packages/db/migrations into /app/migrations.
- postgres 3.4.5 added as direct broker dep.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 08:44:52 +01:00

75 KiB
Raw Blame History

claudemesh-cli v2 Pass 2 — Local-first storage

⚠️ This document describes v2 Pass 2 work entirely — NOT the Pass 1 scope.

For the v2 Pass 1 implementation target, see 2026-04-11-cli-v2-pass1.md.

Pass 1 has NO local SQLite source of truth, NO Lamport clock, NO sync daemon, NO write queue, NO conflict resolution, NO publish transaction. v2 Pass 1 uses the broker as the authority for all mesh data (same as v1). Local caching, if any, is ephemeral and read-only.

This entire document describes Pass 2 work that ships later — when the local-first architectural improvement is prioritized over other backlog items. Until then, do not reference this spec for Pass 1 implementation decisions.

Status: Pass 2 future reference — NOT the Pass 1 implementation target Created: 2026-04-10 Consolidated: 2026-04-10 (post-reviews, critical bugs fixed inline) Companion to: 2026-04-10-cli-v2-final-vision.md (§7 defers to this document for all storage details) Purpose: Complete specification of the local SQLite store, sync protocol, conflict resolution, and broker integration. Every distributed-systems correctness concern lives here.

This document has been reviewed twice (generic architecture review + GPT-5.3-Codex distributed systems review) and all critical bugs are fixed inline below. When the architecture spec body conflicts with this document, this document wins for storage concerns.


Table of contents

  1. Design principles
  2. Runtime and dependencies
  3. File layout and permissions
  4. Lamport clock algorithm (atomic, race-free)
  5. Schema (complete, with all constraints)
  6. Vector storage with model fingerprinting
  7. Memory recall semantics
  8. File blob storage and garbage collection
  9. Personal → shared publish upgrade protocol
  10. Task claim semantics and audit events
  11. Single-writer concurrency model
  12. Sync protocol (outbox, inbox, broker epoch, ordering)
  13. Conflict resolution per tool family
  14. Offline behavior
  15. Error recovery
  16. Migration between schema versions
  17. Bundle size accounting (honest)
  18. Shutdown and drain protocol
  19. Testing strategy
  20. Operational concerns
  21. Open questions deferred to v1.1+

1. Design principles

P1 — SQLite is the source of truth for mesh data

Every stateful operation writes locally first. The broker is a sync channel. When the broker is unreachable, the CLI is fully functional for data the user already has.

P2 — Single writer, many readers

SQLite WAL mode + a single-writer queue. No "database is locked" errors. No nested transactions across daemon and tool handlers.

P3 — Last-writer-wins with total order via (lamport, peer_id_bytes)

Cross-peer conflicts resolved by comparing (lamport, peer_id_bytes) tuples. peer_id is compared byte-wise on canonical UTF-8 (not localeCompare) to guarantee deterministic ordering across hosts with different locales or ICU versions.

P4 — Idempotency at every boundary

Inbox operations are deduplicated by (broker_epoch, broker_seq). Outbox operations carry a stable client_op_id (UUIDv7) that the broker honors for dedupe. Retry is always safe.

P5 — Append-only where possible

Vectors, audit events, and message history are append-only. Deletes are tombstones, not row removal.

P6 — Content-addressed blobs

Files over 64 KB live outside SQLite, addressed by SHA256. Refcounted for GC.

P7 — Explicit over implicit

Every query that could cross peers has an explicit scope (self, peer:<id>, all). No magic global queries.

P8 — Fail-safe to offline

Tool handlers always succeed locally. If the sync daemon dies, the tool surface still works. If SQLite dies, the CLI surfaces a clear error and refuses to proceed (no corrupted-state operations).

P9 — Every write is inside a transaction, through the queue

No "loose" writes. Every state-changing SQL statement runs inside a transaction enqueued on the single-writer queue. The lamport tick is part of the same transaction.

P10 — Sync durability via outbox, not "fire and forget"

An operation is not "done" until its outbox row is synced_at != null. Broker acks include a stable server identifier that the outbox records. Crash-after-send-before-ack replays are idempotent on the broker side via client_op_id.


2. Runtime and dependencies

2.1 SQLite engine

better-sqlite3 for the tool handler path. Synchronous API, WAL-friendly, no native async overhead per call.

Rejected alternatives:

  • node:sqlite — experimental in Node 22, release cadence unclear
  • bun:sqlite — Bun-only, but distribution target is Node
  • libsql — larger, more deps

2.2 Vector extension

sqlite-vec (not sqlite-vss):

  • Actively maintained (vss is stale)
  • Smaller binary (~200KB vs ~2MB)
  • Pure C, no FAISS dependency
  • Simpler vec0 virtual table API

Loaded at runtime via db.loadExtension('sqlite-vec'). The extension binary is bundled per-platform in the npm package under node_modules/claudemesh-cli/vendor/sqlite-vec-<platform>.<ext>.

2.3 Schema migration runner

Custom, not drizzle-kit. The migration surface is tiny (~5 migrations for v1.0.0), deterministic at startup, and we already write types by hand.

2.4 No ORM

Hand-written SQL with parameterized placeholders. Typed query wrappers live in services/store/query.ts.

2.5 UUID generation

uuidv7 from a small pure-JS lib (≈ 2KB). UUIDv7 gives temporal ordering in IDs, which helps index locality and debugging.


3. File layout and permissions

~/.claudemesh/
├── data.db                    # 0600 — main SQLite database
├── data.db-wal                # 0600 — write-ahead log (created by SQLite)
├── data.db-shm                # 0600 — shared memory file (created by SQLite)
├── blobs/                     # 0700 — content-addressed blob store
│   ├── a1/
│   │   └── a1b2c3...sha256    # 0600
│   └── f5/
│       └── f5e4d3...sha256    # 0600
└── ...

Permission enforcement:

  • At startup, services/store/db.ts verifies file modes match baseline; fixes drift with a logged warning
  • New files created with umask 077 (0600 files, 0700 dirs)
  • blobs/ subdirectory naming uses first two hex chars of SHA256 to keep per-directory file counts manageable

4. Lamport clock algorithm

This is the part the original spec had wrong. The canonical rules here are load-bearing for correctness. Every storage write MUST follow them.

4.1 The invariant

Every peer maintains a per-mesh Lamport counter in lamport_clocks(mesh_slug, value). The counter MUST satisfy:

∀ local_event: counter_after = counter_before + 1
∀ merged_event (from remote peer with lamport L):
    counter_after = max(counter_before, L) + 1

4.2 Atomic tick implementation

The original SELECT then INSERT OR REPLACE pattern races between concurrent writers. The correct implementation uses a single atomic UPDATE ... RETURNING:

// services/store/lamport.ts

export class LamportRaceError extends Error {
  readonly code = 'LAMPORT_RACE';
  constructor(meshSlug: string) {
    super(`tickLamport: mesh ${meshSlug} row disappeared between INSERT and UPDATE`);
  }
}

export class LamportUnknownMeshError extends Error {
  readonly code = 'LAMPORT_UNKNOWN_MESH';
  constructor(meshSlug: string) {
    super(`tickLamport: mesh ${meshSlug} does not exist in mesh table`);
  }
}

/**
 * Atomically tick the lamport clock for a mesh. MUST be called inside the
 * transaction that writes the domain row it's stamping, AND that transaction
 * MUST be enqueued on the single-writer queue.
 *
 * Defensive: validates the mesh exists, validates the UPDATE affected exactly
 * one row, and throws clearly-typed errors on any anomaly.
 *
 * @param db  The writer connection (use write queue)
 * @param meshSlug  The mesh whose clock to tick
 * @param incomingLamport  The remote event's lamport (for merge) or undefined for local
 * @returns The new lamport value to stamp on the row
 * @throws LamportUnknownMeshError  if the mesh slug does not exist
 * @throws LamportRaceError         if the UPDATE matched zero rows (should never happen)
 */
export function tickLamport(
  db: Database,
  meshSlug: string,
  incomingLamport?: number,
): number {
  // Validate the mesh exists before touching the clock
  const meshExists = db.prepare('SELECT 1 FROM mesh WHERE slug = ?').get(meshSlug);
  if (!meshExists) {
    throw new LamportUnknownMeshError(meshSlug);
  }

  // Ensure the lamport_clocks row exists for this mesh
  db.prepare(`
    INSERT INTO lamport_clocks (mesh_slug, value)
    VALUES (?, 0)
    ON CONFLICT(mesh_slug) DO NOTHING
  `).run(meshSlug);

  // Atomic UPDATE ... RETURNING: compute max(current, incoming) + 1 in SQL
  const base = incomingLamport ?? 0;
  const result = db.prepare(`
    UPDATE lamport_clocks
    SET value = MAX(value, ?) + 1
    WHERE mesh_slug = ?
    RETURNING value
  `).get(base, meshSlug) as { value: number } | undefined;

  // Defensive: RETURNING may yield nothing if the row was deleted between
  // the INSERT and UPDATE (e.g. concurrent mesh deletion outside the queue).
  // This should be impossible under the single-writer contract, but we check
  // anyway and throw a clear error rather than crashing on .value of undefined.
  if (!result) {
    throw new LamportRaceError(meshSlug);
  }
  return result.value;
}

4.3 Caller contract

Every caller of tickLamport MUST:

  1. Be inside a db.transaction(() => { ... }) block
  2. Enqueue the transaction through the single-writer queue
  3. Stamp the returned value on the domain row in the same transaction
  4. Never call tickLamport twice in the same transaction (one tick per logical event)

Failure mode if rule 1 is violated: the counter updates but the domain row write races separately, breaking the invariant. CI tests enforce this by mocking the write queue and asserting tickLamport is always called inside queue.enqueue(...).

4.4 Rollback semantics

If the enclosing transaction rolls back, the lamport update rolls back with it. The counter goes back to its previous value, and the logical event is treated as if it never happened. This is correct only if no external effect escaped the transaction — e.g. no network call was made, no file was written outside the DB. The sync daemon guarantees this by enqueueing outbox rows inside the same transaction as the domain write.

4.5 Tiebreaker: bytewise peer_id comparison on NFC-normalized UTF-8

When two operations have the same lamport value, the tiebreaker is byte-wise comparison of the NFC-normalized UTF-8 representation of peer_id.

Normalization is mandatory. Without NFC normalization, two peers with visually-identical display names encoded differently (NFC vs NFD — e.g. "café" as café vs cafe\u0301) produce different byte sequences and thus different conflict winners. NFC is enforced at peer registration and before every comparison.

// services/store/conflict.ts

/** Normalize a peer_id to NFC before any comparison or storage. */
export function normalizePeerId(peerId: string): string {
  return peerId.normalize('NFC');
}

export function compareOps(
  a: { lamport: number; peer_id: string },
  b: { lamport: number; peer_id: string },
): number {
  if (a.lamport !== b.lamport) return a.lamport - b.lamport;
  // Both peer_ids MUST be NFC-normalized; this is enforced at write time.
  // Buffer.compare is stable across Node/Bun, little-endian/big-endian, and
  // platform-independent because UTF-8 byte sequence is canonical.
  return Buffer.compare(
    Buffer.from(a.peer_id, 'utf8'),
    Buffer.from(b.peer_id, 'utf8'),
  );
}

/** Returns true if A wins over B (A is "more recent"). */
export function aWins(
  a: { lamport: number; peer_id: string },
  b: { lamport: number; peer_id: string },
): boolean {
  return compareOps(a, b) > 0;
}

Enforcement at write time: every code path that inserts a peer_id into the database calls normalizePeerId() first. This includes:

  • Mesh join (new peer registration)
  • Outbox ops being enqueued with peer_id
  • Inbox ops being applied
  • Profile updates
  • Any schema that has a peer_id column (memory, state_kv, vectors, files, tasks, peers)

A database trigger enforces this at the SQL layer as a backup:

-- On every INSERT/UPDATE of peer_id columns, reject if not NFC-normalized
-- (actual NFC check must be done in application code; SQLite has no NFC function)
-- Instead, we validate at the single-writer queue's entry point via a helper.

Since SQLite doesn't have a native NFC function, the check is in services/store/normalize.ts which wraps every writer with an NFC assertion. The application-level enforcement is the primary defense.

Never use localeCompare for conflict resolution — it depends on the host's ICU version and locale, which differs across peers and causes divergent winners for the same conflict.

4.6 Hybrid logical clocks (NOT in v1.0.0)

HLC combines physical time with a logical counter for better causality approximation. Rejected for v1.0.0:

  • Physical clock skew introduces new failure modes
  • Debugging HLC behavior requires deep familiarity
  • Plain Lamport + bytewise tiebreaker is sufficient for LWW
  • HLC can be added later as an additive migration (new column, not a replacement)

4.7 Vector clocks (NOT shipped)

Storage cost (one int per peer per row) and complexity cost. Permanently rejected. If causal consistency becomes a hard requirement for some feature, that feature uses server-side ordering via the broker.


5. Schema

5.1 Meshes

CREATE TABLE IF NOT EXISTS mesh (
  slug TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  kind TEXT NOT NULL CHECK (kind IN ('personal', 'shared_owner', 'shared_guest')),
  broker_url TEXT,
  server_id TEXT,
  broker_epoch INTEGER NOT NULL DEFAULT 0,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL,
  last_sync_at INTEGER,
  schema_version INTEGER NOT NULL DEFAULT 1,
  sync_paused INTEGER NOT NULL DEFAULT 0,
  CHECK (
    (kind = 'personal' AND broker_url IS NULL AND server_id IS NULL) OR
    (kind IN ('shared_owner', 'shared_guest') AND broker_url IS NOT NULL AND server_id IS NOT NULL)
  )
);

CREATE TABLE IF NOT EXISTS lamport_clocks (
  mesh_slug TEXT PRIMARY KEY REFERENCES mesh(slug) ON DELETE CASCADE,
  value INTEGER NOT NULL DEFAULT 0
);

broker_epoch: monotonically increasing, managed by the broker. When the broker restarts and reassigns sequence numbers, it increments its epoch. The inbox unique constraint uses (mesh_slug, broker_epoch, broker_seq) so a new epoch cannot collide with prior deliveries.

Broker epoch ack protocol: every broker ack message includes the broker's current epoch (not the epoch the op was processed under). The CLI updates mesh.broker_epoch from the current epoch on every ack. This handles the restart race:

  • CLI sends op under epoch N
  • Broker restarts mid-op, becomes epoch N+1
  • Broker replays the op (or the CLI retries) under epoch N+1
  • Ack comes back with current_epoch: N+1
  • CLI updates mesh.broker_epoch = N+1
  • Next send uses epoch N+1

If an ack arrives with an epoch LOWER than the CLI's current recorded epoch (shouldn't happen, but defensive), the CLI logs a warning and ignores the epoch update but still accepts the ack (the server-seq is valid).

If the CLI tries to send an op tagged with an old epoch and the broker has moved on, the broker responds with epoch_mismatch + current epoch, and the CLI re-tags the outbox op with the new epoch before retrying (no data loss, just a retry delay).

sync_paused: set to 1 when the outbox has accumulated too many failed ops for a mesh. Cleared by claudemesh doctor --resume-sync.

5.2 Memory

CREATE TABLE IF NOT EXISTS memory (
  id TEXT PRIMARY KEY,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  peer_id TEXT NOT NULL,
  key TEXT NOT NULL,
  value TEXT NOT NULL,
  tags TEXT,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL,
  lamport INTEGER NOT NULL,
  tombstone INTEGER NOT NULL DEFAULT 0,
  UNIQUE(mesh_slug, peer_id, key)
);

CREATE INDEX memory_mesh_key_live ON memory(mesh_slug, key) WHERE tombstone = 0;
CREATE INDEX memory_mesh_peer_live ON memory(mesh_slug, peer_id) WHERE tombstone = 0;

Upsert logic (NOT INSERT OR REPLACE): INSERT ... ON CONFLICT(mesh_slug, peer_id, key) DO UPDATE SET with an explicit WHERE clause comparing (lamport, peer_id) tuples. This preserves LWW semantics and avoids losing concurrent writes.

export function upsertMemory(db: Database, row: MemoryRow): void {
  db.prepare(`
    INSERT INTO memory (id, mesh_slug, peer_id, key, value, tags, created_at, updated_at, lamport, tombstone)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT(mesh_slug, peer_id, key) DO UPDATE SET
      value = excluded.value,
      tags = excluded.tags,
      updated_at = excluded.updated_at,
      lamport = excluded.lamport,
      tombstone = excluded.tombstone,
      id = excluded.id
    WHERE excluded.lamport > memory.lamport
       OR (excluded.lamport = memory.lamport AND excluded.peer_id > memory.peer_id)
  `).run(row.id, row.mesh_slug, row.peer_id, row.key, row.value,
         row.tags ?? null, row.created_at, row.updated_at, row.lamport, row.tombstone);
}

Note: excluded.peer_id > memory.peer_id uses SQLite's default binary comparison, which is byte-wise for BLOB and TEXT. That matches the application-level bytewise rule.

5.3 State KV

CREATE TABLE IF NOT EXISTS state_kv (
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  key TEXT NOT NULL,
  value TEXT NOT NULL,
  updated_by TEXT NOT NULL,
  updated_at INTEGER NOT NULL,
  lamport INTEGER NOT NULL,
  tombstone INTEGER NOT NULL DEFAULT 0,
  PRIMARY KEY (mesh_slug, key)
);

CREATE INDEX state_kv_lamport ON state_kv(mesh_slug, lamport);

Upsert with LWW predicate:

export function upsertStateKv(db: Database, row: StateKvRow): void {
  db.prepare(`
    INSERT INTO state_kv (mesh_slug, key, value, updated_by, updated_at, lamport, tombstone)
    VALUES (?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT(mesh_slug, key) DO UPDATE SET
      value = excluded.value,
      updated_by = excluded.updated_by,
      updated_at = excluded.updated_at,
      lamport = excluded.lamport,
      tombstone = excluded.tombstone
    WHERE excluded.lamport > state_kv.lamport
       OR (excluded.lamport = state_kv.lamport AND excluded.updated_by > state_kv.updated_by)
  `).run(row.mesh_slug, row.key, row.value, row.updated_by, row.updated_at, row.lamport, row.tombstone);
}

5.4 Vectors

CREATE TABLE IF NOT EXISTS vector_models (
  id TEXT PRIMARY KEY,                    -- fingerprint: sha256(provider:model:version:dim:quant)
  provider TEXT NOT NULL,                 -- e.g. 'voyage-ai', 'openai', 'sentence-transformers'
  model TEXT NOT NULL,                    -- e.g. 'voyage-3-large'
  model_version TEXT NOT NULL,            -- e.g. '1.0' or 'unknown'
  dim INTEGER NOT NULL,
  quantization TEXT NOT NULL DEFAULT 'float32',
  vec_table TEXT NOT NULL,                -- e.g. 'vectors_a1b2c3'
  created_at INTEGER NOT NULL,
  UNIQUE(provider, model, model_version, dim, quantization)
);

CREATE TABLE IF NOT EXISTS vector_metadata (
  rowid INTEGER PRIMARY KEY AUTOINCREMENT,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  peer_id TEXT NOT NULL,
  key TEXT NOT NULL,
  content TEXT NOT NULL,
  metadata TEXT,
  model_id TEXT NOT NULL REFERENCES vector_models(id),
  vec_rowid INTEGER NOT NULL,
  lamport INTEGER NOT NULL,
  created_at INTEGER NOT NULL,
  tombstone INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX vector_metadata_mesh_model ON vector_metadata(mesh_slug, model_id) WHERE tombstone = 0;
CREATE INDEX vector_metadata_peer ON vector_metadata(mesh_slug, peer_id) WHERE tombstone = 0;

-- vec tables are created dynamically, one per model fingerprint:
-- CREATE VIRTUAL TABLE vectors_<hash> USING vec0(embedding FLOAT[<dim>]);

Model fingerprint: sha256(provider + ':' + model + ':' + model_version + ':' + dim + ':' + quantization). This catches provider-specific model revisions, tokenizer changes, and quantization differences that would silently corrupt cross-machine semantic compatibility.

5.5 Files

CREATE TABLE IF NOT EXISTS files (
  id TEXT PRIMARY KEY,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  peer_id TEXT NOT NULL,
  path TEXT NOT NULL,
  sha256 TEXT NOT NULL,
  size INTEGER NOT NULL,
  storage_kind TEXT NOT NULL CHECK (storage_kind IN ('inline', 'blob')),
  inline_content BLOB,
  blob_path TEXT,
  shared_with TEXT NOT NULL DEFAULT '[]',
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL,
  lamport INTEGER NOT NULL,
  tombstone INTEGER NOT NULL DEFAULT 0,
  CHECK (
    (storage_kind = 'inline' AND inline_content IS NOT NULL AND blob_path IS NULL) OR
    (storage_kind = 'blob' AND inline_content IS NULL AND blob_path IS NOT NULL)
  )
);

CREATE INDEX files_mesh_peer_live ON files(mesh_slug, peer_id) WHERE tombstone = 0;
CREATE INDEX files_sha256 ON files(sha256);

CREATE TABLE IF NOT EXISTS blob_refs (
  sha256 TEXT PRIMARY KEY,
  ref_count INTEGER NOT NULL DEFAULT 0,
  bytes INTEGER NOT NULL,
  created_at INTEGER NOT NULL,
  last_accessed INTEGER NOT NULL,
  pending_unlink INTEGER NOT NULL DEFAULT 0
);

storage_kind is explicit instead of inferring from nullable fields. Eliminates the earlier (inline != null) XOR (blob != null) check condition.

pending_unlink marks blobs whose refcount has dropped to 0 but whose filesystem unlink has not yet completed. A GC sweep retries any rows still pending_unlink = 1.

5.6 Tasks

CREATE TABLE IF NOT EXISTS tasks (
  id TEXT PRIMARY KEY,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  title TEXT NOT NULL,
  description TEXT,
  status TEXT NOT NULL CHECK (status IN ('open', 'claimed', 'completed', 'cancelled')),
  claimed_by TEXT,
  claimed_at INTEGER,
  completed_at INTEGER,
  created_by TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  updated_at INTEGER NOT NULL,
  lamport INTEGER NOT NULL,
  tombstone INTEGER NOT NULL DEFAULT 0
);

CREATE INDEX tasks_mesh_status ON tasks(mesh_slug, status) WHERE tombstone = 0;

CREATE TABLE IF NOT EXISTS task_claim_events (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  task_id TEXT NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
  peer_id TEXT NOT NULL,
  event_type TEXT NOT NULL CHECK (event_type IN (
    'claimed',            -- peer successfully claimed an open task
    'superseded',         -- peer's claim lost to another peer's concurrent claim
    'rejected_terminal',  -- late claim for a task already completed/cancelled
    'released',           -- peer voluntarily released their claim
    'completed',          -- peer marked task complete
    'cancelled'           -- task cancelled
  )),
  lamport INTEGER NOT NULL,
  event_time INTEGER NOT NULL,           -- sender-provided, not receiver wall time
  applied_at INTEGER NOT NULL,           -- receiver wall time for debug only
  conflict_peer_id TEXT,
  conflict_lamport INTEGER
);

CREATE INDEX task_claim_events_task ON task_claim_events(task_id, lamport);

event_time vs applied_at: event_time is the sender-provided timestamp, used for replication equality. applied_at is the receiver wall time, used only for logs and debugging, never for conflict resolution.

5.7 Peers (cache)

CREATE TABLE IF NOT EXISTS peers (
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  peer_id TEXT NOT NULL,
  display_name TEXT,
  status TEXT,
  summary TEXT,
  last_seen_at INTEGER,
  PRIMARY KEY (mesh_slug, peer_id)
);

5.8 Outbox (local → broker)

CREATE TABLE IF NOT EXISTS outbox (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  op_type TEXT NOT NULL,
  payload TEXT NOT NULL,
  client_op_id TEXT NOT NULL UNIQUE,     -- UUIDv7, broker dedupes on this
  server_ack_id TEXT,
  broker_epoch INTEGER,                  -- recorded from the ack
  broker_seq INTEGER,                    -- recorded from the ack
  created_at INTEGER NOT NULL,
  attempts INTEGER NOT NULL DEFAULT 0,
  last_error TEXT,
  last_attempt_at INTEGER,
  synced_at INTEGER
);

CREATE INDEX outbox_pending ON outbox(mesh_slug, id) WHERE synced_at IS NULL;

The broker MUST honor client_op_id for dedupe. If the CLI sends the same client_op_id twice (crash-between-send-and-ack), the broker returns the original server_ack_id, epoch, and seq without applying the op a second time. This is the exactly-once delivery contract.

5.9 Inbox (broker → local)

CREATE TABLE IF NOT EXISTS inbox (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  mesh_slug TEXT NOT NULL REFERENCES mesh(slug) ON DELETE CASCADE,
  broker_epoch INTEGER NOT NULL,
  broker_seq INTEGER NOT NULL,
  op_type TEXT NOT NULL,
  payload TEXT NOT NULL,
  received_at INTEGER NOT NULL,
  applied_at INTEGER
);

CREATE UNIQUE INDEX inbox_epoch_seq ON inbox(mesh_slug, broker_epoch, broker_seq);
CREATE INDEX inbox_pending ON inbox(mesh_slug, id) WHERE applied_at IS NULL;

Composite uniqueness (mesh_slug, broker_epoch, broker_seq) guards against broker restarts that reset sequence numbers. When a new epoch begins, seq starts at 1 again but collides with nothing because the epoch differs.

5.10 Migrations tracking

CREATE TABLE IF NOT EXISTS _migrations (
  version INTEGER PRIMARY KEY,
  applied_at INTEGER NOT NULL
);

6. Vector storage with model fingerprinting

6.1 Model fingerprint

// services/store/vector-fingerprint.ts

export interface ModelIdentity {
  provider: string;        // 'voyage-ai' | 'openai' | 'sentence-transformers' | 'custom'
  model: string;           // 'voyage-3-large'
  modelVersion: string;    // '1.0' or 'unknown' if unversioned
  dim: number;             // 1024
  quantization: string;    // 'float32' | 'int8' | 'binary'
}

export function modelFingerprint(m: ModelIdentity): string {
  const canonical = `${m.provider}:${m.model}:${m.modelVersion}:${m.dim}:${m.quantization}`;
  return sha256Hex(canonical).slice(0, 16);
}

Each unique ModelIdentity gets its own vec table. Mismatched dimensions are impossible because the fingerprint diverges before the caller can insert into the wrong table.

6.2 Table creation with race-safe registration

The TOCTOU race in the original spec (SELECT then CREATE VIRTUAL TABLE then INSERT) is fixed by using INSERT ... ON CONFLICT DO NOTHING and re-reading:

export function ensureVecTable(db: Database, model: ModelIdentity): string {
  const id = modelFingerprint(model);
  const tableName = `vectors_${id}`;

  // Try to register the model. If it already exists, this is a no-op.
  db.prepare(`
    INSERT INTO vector_models (id, provider, model, model_version, dim, quantization, vec_table, created_at)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    ON CONFLICT(id) DO NOTHING
  `).run(id, model.provider, model.model, model.modelVersion, model.dim, model.quantization, tableName, Date.now());

  // Ensure the virtual table exists. CREATE VIRTUAL TABLE IF NOT EXISTS is safe.
  // Validate the table name is pure alphanumeric/underscore before interpolating.
  if (!/^vectors_[a-f0-9]{16}$/.test(tableName)) {
    throw new Error(`invalid vec table name: ${tableName}`);
  }
  db.prepare(`CREATE VIRTUAL TABLE IF NOT EXISTS ${tableName} USING vec0(embedding FLOAT[${model.dim}])`).run();

  return tableName;
}

The table name is validated against a strict regex before interpolation to prevent any SQL injection from a corrupted fingerprint.

6.3 Insert

export function vectorStore(
  db: Database,
  queue: WriteQueue,
  input: {
    mesh: string;
    peer: string;
    key: string;
    content: string;
    embedding: number[];
    model: ModelIdentity;
    metadata?: unknown;
  },
): Promise<void> {
  return queue.enqueue(() => {
    db.transaction(() => {
      if (input.embedding.length !== input.model.dim) {
        throw new Error(`embedding length ${input.embedding.length} does not match model dim ${input.model.dim}`);
      }
      const vecTable = ensureVecTable(db, input.model);
      const modelId = modelFingerprint(input.model);

      const buf = Buffer.from(new Float32Array(input.embedding).buffer);
      const vecResult = db.prepare(`INSERT INTO ${vecTable}(embedding) VALUES (?)`).run(buf);
      const vecRowid = Number(vecResult.lastInsertRowid);

      const lamport = tickLamport(db, input.mesh);

      db.prepare(`
        INSERT INTO vector_metadata
          (mesh_slug, peer_id, key, content, metadata, model_id, vec_rowid, lamport, created_at)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
      `).run(
        input.mesh,
        input.peer,
        input.key,
        input.content,
        JSON.stringify(input.metadata ?? null),
        modelId,
        vecRowid,
        lamport,
        Date.now(),
      );
    })();
  });
}

Cross-model queries are forbidden. The caller specifies the model; if that model doesn't exist in the store, the result is empty (not an error).

Read-time integrity validation: the vec_table column in vector_models is trusted input. If the database is corrupted or manually edited, a malicious vec_table value could inject arbitrary SQL into the CREATE VIRTUAL TABLE / SELECT FROM statements. The query path re-derives the expected table name from the stored identity columns and verifies it matches BEFORE using it.

export function vectorSearch(
  db: Database,
  input: { mesh: string; query: number[]; model: ModelIdentity; limit?: number },
): VectorSearchResult[] {
  const id = modelFingerprint(input.model);
  const row = db.prepare(`
    SELECT vec_table, dim, provider, model, model_version, quantization
    FROM vector_models WHERE id = ?
  `).get(id) as {
    vec_table: string;
    dim: number;
    provider: string;
    model: string;
    model_version: string;
    quantization: string;
  } | undefined;

  if (!row) return [];

  // Integrity check: re-derive the fingerprint from stored identity columns
  // and verify vec_table matches. Prevents trusting a corrupted registry row.
  const derivedFingerprint = modelFingerprint({
    provider: row.provider,
    model: row.model,
    modelVersion: row.model_version,
    dim: row.dim,
    quantization: row.quantization,
  });
  const expectedTableName = `vectors_${derivedFingerprint}`;
  if (row.vec_table !== expectedTableName) {
    throw new Error(
      `vector_models integrity failure: id ${id} has vec_table="${row.vec_table}" ` +
      `but derived ${expectedTableName}. Database may be corrupted — run claudemesh doctor.`
    );
  }

  // Defense in depth: regex-validate the format
  if (!/^vectors_[a-f0-9]{16}$/.test(row.vec_table)) {
    throw new Error(`invalid vec table name from registry: ${row.vec_table}`);
  }

  if (row.dim !== input.query.length) {
    throw new Error(`dimension mismatch: expected ${row.dim}, got ${input.query.length}`);
  }

  const buf = Buffer.from(new Float32Array(input.query).buffer);
  return db.prepare(`
    SELECT vm.key, vm.content, vm.peer_id, vm.metadata, t.distance
    FROM ${row.vec_table} t
    JOIN vector_metadata vm ON vm.vec_rowid = t.rowid
    WHERE t.embedding MATCH ?
      AND vm.mesh_slug = ?
      AND vm.tombstone = 0
      AND vm.model_id = ?
    ORDER BY t.distance
    LIMIT ?
  `).all(buf, input.mesh, id, input.limit ?? 10) as VectorSearchResult[];
}

ensureVecTable runs the same integrity check before CREATE VIRTUAL TABLE IF NOT EXISTS — if the stored vec_table doesn't match the derived name, the function throws instead of creating a table with the wrong name.

6.5 Model migration protocol

Changing embedding models is an explicit, expensive operation via claudemesh advanced re-embed:

  1. Begin: mark old model as deprecated in vector_models
  2. For each row in vector_metadata under old model (with progress output):
    • Re-embed content with new model (requires network to the embedding provider or a local model)
    • Insert into new vec table under new model fingerprint
    • Tombstone the old row
  3. After completion with zero reads of old model for 30 days, GC the old vec table via DROP TABLE vectors_<old_id>

During the migration, reads against the old model still work (the vec table is not dropped until the grace period ends). New inserts go to the new model.


7. Memory recall semantics

7.1 API

type RecallInput = {
  mesh: string;
  key: string;
  scope?:
    | { kind: 'self' }                    // default
    | { kind: 'peer'; peer_id: string }
    | { kind: 'all' };
};

type RecallResult =
  | { kind: 'single'; peer_id: string; value: string; lamport: number; updated_at: number }
  | { kind: 'multi'; results: Array<{ peer_id: string; value: string; lamport: number; updated_at: number }> }
  | { kind: 'not_found' };

7.2 Resolution

scope Behavior
{ kind: 'self' } (default) Returns the current peer's value for key. not_found if absent.
{ kind: 'peer', peer_id } Returns that peer's value. not_found if absent.
{ kind: 'all' } Returns array sorted by (lamport DESC, peer_id bytewise ASC). Empty array if none.

7.3 Tool surface

// mcp/tools/memory.ts
{
  name: 'recall',
  description: 'Retrieve a remembered value by key.',
  inputSchema: {
    key: z.string(),
    peer: z.enum(['self', 'all']).or(z.string()).default('self'),
  },
  handler: async ({ key, peer }) => memoryService.recall({
    mesh: currentMesh,
    key,
    scope: peer === 'self'
      ? { kind: 'self' }
      : peer === 'all'
        ? { kind: 'all' }
        : { kind: 'peer', peer_id: peer },
  }),
}

7.4 Namespaced keys (convention)

For shared team memories, the convention is to namespace the key:

remember('team.api_key', '...')
recall('team.api_key')

This avoids per-peer collision entirely. The tool documentation recommends this pattern.


8. File blob storage and garbage collection

8.1 Path validation

export function validatePath(p: string): void {
  if (p.length === 0) throw new Error('empty path');
  if (p.length > 1024) throw new Error('path too long');
  if (p.includes('\0')) throw new Error('null byte in path');
  if (p.startsWith('/')) throw new Error('absolute path forbidden');
  if (p.includes('\\')) throw new Error('backslash forbidden');
  if (/(^|\/)\.\.($|\/)/.test(p)) throw new Error('parent reference forbidden');
  if (/(^|\/)\.($|\/)/.test(p)) throw new Error('self reference forbidden');
  if (!/^[\w. \-/+()]+$/.test(p)) throw new Error('invalid characters');
}

8.2 Insert

export function fileShare(
  db: Database,
  queue: WriteQueue,
  blobsDir: string,
  input: { mesh: string; peer: string; path: string; content: Buffer },
): Promise<{ id: string }> {
  validatePath(input.path);
  const sha = sha256Hex(input.content);
  const size = input.content.length;

  return queue.enqueue(() => {
    // Write filesystem blob BEFORE the transaction so a rolled-back transaction
    // doesn't leave a blob without a reference. If the transaction fails,
    // we rely on GC sweep to clean up orphan files with pending_unlink = 1.
    let blobPath: string | null = null;
    let inlineContent: Buffer | null = null;

    if (size < 64 * 1024) {
      inlineContent = input.content;
    } else {
      blobPath = `blobs/${sha.slice(0, 2)}/${sha}`;
      const fullPath = path.join(blobsDir, '..', blobPath);
      fs.mkdirSync(path.dirname(fullPath), { recursive: true, mode: 0o700 });
      // Use O_EXCL to avoid overwriting concurrent write
      try {
        fs.writeFileSync(fullPath, input.content, { mode: 0o600, flag: 'wx' });
      } catch (err: any) {
        if (err.code !== 'EEXIST') throw err;
        // Already exists (deduped) — verify content matches
        const existing = fs.readFileSync(fullPath);
        if (!existing.equals(input.content)) {
          throw new Error(`sha256 collision or corrupted blob: ${sha}`);
        }
      }
    }

    const id = uuidv7();
    db.transaction(() => {
      if (blobPath !== null) {
        db.prepare(`
          INSERT INTO blob_refs (sha256, ref_count, bytes, created_at, last_accessed)
          VALUES (?, 1, ?, ?, ?)
          ON CONFLICT(sha256) DO UPDATE SET
            ref_count = ref_count + 1,
            last_accessed = excluded.last_accessed
        `).run(sha, size, Date.now(), Date.now());
      }

      const lamport = tickLamport(db, input.mesh);

      db.prepare(`
        INSERT INTO files
          (id, mesh_slug, peer_id, path, sha256, size, storage_kind, inline_content, blob_path, shared_with, created_at, updated_at, lamport)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
      `).run(
        id, input.mesh, input.peer, input.path, sha, size,
        blobPath !== null ? 'blob' : 'inline',
        inlineContent,
        blobPath,
        '[]',
        Date.now(), Date.now(), lamport,
      );
    })();
    return { id };
  });
}

Why write blob before transaction: if the filesystem write succeeds and the transaction fails, the blob is orphaned but the GC sweep finds it via pending_unlink = 1. If we write after the transaction commits, a crash between commit and write would leave the DB referencing a missing blob. The first failure is recoverable via GC; the second is data loss.

export function fileDelete(
  db: Database,
  queue: WriteQueue,
  blobsDir: string,
  input: { mesh: string; file_id: string },
): Promise<void> {
  return queue.enqueue(() => {
    let blobToUnlink: string | null = null;

    db.transaction(() => {
      const file = db.prepare('SELECT sha256, blob_path, storage_kind FROM files WHERE id = ? AND mesh_slug = ?').get(input.file_id, input.mesh) as any;
      if (!file) return;

      const lamport = tickLamport(db, input.mesh);
      db.prepare('UPDATE files SET tombstone = 1, updated_at = ?, lamport = ? WHERE id = ?').run(Date.now(), lamport, input.file_id);

      if (file.storage_kind === 'blob') {
        db.prepare('UPDATE blob_refs SET ref_count = ref_count - 1 WHERE sha256 = ?').run(file.sha256);
        const ref = db.prepare('SELECT ref_count FROM blob_refs WHERE sha256 = ?').get(file.sha256) as { ref_count: number };
        if (ref.ref_count <= 0) {
          db.prepare('UPDATE blob_refs SET pending_unlink = 1 WHERE sha256 = ?').run(file.sha256);
          blobToUnlink = file.blob_path;
        }
      }
    })();

    // Unlink happens AFTER the transaction commits. If it fails, GC sweep
    // retries via pending_unlink = 1.
    if (blobToUnlink !== null) {
      const fullPath = path.join(blobsDir, '..', blobToUnlink);
      try {
        fs.unlinkSync(fullPath);
        db.prepare('DELETE FROM blob_refs WHERE sha256 = ? AND pending_unlink = 1').run(blobToUnlink);
      } catch (err: any) {
        if (err.code !== 'ENOENT') {
          // leave pending_unlink = 1 for GC sweep to retry
        }
      }
    }
  });
}

8.4 GC sweep

Runs every 24 hours and on shutdown:

export function gcBlobs(db: Database, queue: WriteQueue, blobsDir: string): Promise<void> {
  return queue.enqueue(() => {
    // Pending unlinks from earlier failures
    const pending = db.prepare('SELECT sha256 FROM blob_refs WHERE pending_unlink = 1').all() as { sha256: string }[];
    for (const { sha256 } of pending) {
      const blobPath = path.join(blobsDir, sha256.slice(0, 2), sha256);
      try {
        fs.unlinkSync(blobPath);
        db.prepare('DELETE FROM blob_refs WHERE sha256 = ?').run(sha256);
      } catch (err: any) {
        if (err.code === 'ENOENT') {
          db.prepare('DELETE FROM blob_refs WHERE sha256 = ?').run(sha256);
        }
      }
    }
    // Old tombstones
    const cutoff = Date.now() - 90 * 24 * 60 * 60 * 1000;
    db.prepare('DELETE FROM files WHERE tombstone = 1 AND updated_at < ?').run(cutoff);
  });
}

9. Personal → shared publish upgrade protocol

9.1 Phases

The protocol is split into phases, each of which is individually committable so crashes between phases are recoverable.

export async function meshPublish(
  services: { auth: AuthService; api: ApiClient; mesh: MeshService; broker: BrokerClient; queue: WriteQueue; db: Database },
  input: { mesh_slug: string; display_name?: string },
): Promise<{ invite_url: string }> {

  // --- Phase 1: authentication ---
  const token = await services.auth.ensureAuthenticated();

  // --- Phase 2: server registration ---
  // API is idempotent on (user, slug); calling twice returns the same server_id.
  const response = await services.api.post('/api/my/meshes', {
    name: input.display_name ?? (await services.mesh.getLocal(input.mesh_slug)).name,
    slug: input.mesh_slug,
    kind: 'shared_owner',
  });
  // response: { server_id, broker_url, broker_epoch, slug }

  // --- Phase 3: local transition ---
  await services.queue.enqueue(() => {
    services.db.transaction(() => {
      services.db.prepare(`
        UPDATE mesh
        SET kind = 'shared_owner',
            broker_url = ?,
            server_id = ?,
            broker_epoch = ?,
            updated_at = ?
        WHERE slug = ? AND kind = 'personal'
      `).run(response.broker_url, response.server_id, response.broker_epoch, Date.now(), input.mesh_slug);

      // Enqueue a mesh.publish marker op (first sync op).
      services.db.prepare(`
        INSERT INTO outbox (mesh_slug, op_type, payload, client_op_id, created_at)
        VALUES (?, 'mesh.publish', ?, ?, ?)
      `).run(
        input.mesh_slug,
        JSON.stringify({ snapshot_version: 1, schema_version: 1 }),
        uuidv7(),
        Date.now(),
      );
    })();
  });

  // --- Phase 4: backfill ---
  // For small meshes (< 10k rows), enqueue all rows as backfill ops in chunks.
  // For large meshes, use snapshot + cursor protocol (§9.3).
  await backfillOutbox(services, input.mesh_slug);

  // --- Phase 5: sync wait ---
  // Wait for the outbox to drain (with timeout). Publish is considered "done"
  // when the sync daemon has acknowledged the mesh.publish marker.
  await waitForPublishAck(services, input.mesh_slug, { timeoutMs: 30_000 });

  // --- Phase 6: first invite ---
  const invite = await services.api.post(`/api/my/meshes/${input.mesh_slug}/invites`, {
    expires_in: '7d',
  });

  return { invite_url: invite.url };
}

9.2 Backfill with chunking

To satisfy the < 100ms per transaction rule, backfill happens in small chunks:

async function backfillOutbox(services: Services, meshSlug: string): Promise<void> {
  const CHUNK_SIZE = 200;

  for (const table of ['memory', 'state_kv', 'vector_metadata', 'files', 'tasks']) {
    let cursor = 0;
    while (true) {
      const done = await services.queue.enqueue(() => {
        const rows = services.db.prepare(`
          SELECT rowid, * FROM ${table}
          WHERE mesh_slug = ? AND tombstone = 0 AND rowid > ?
          ORDER BY rowid LIMIT ?
        `).all(meshSlug, cursor, CHUNK_SIZE) as any[];

        if (rows.length === 0) return true;

        services.db.transaction(() => {
          for (const row of rows) {
            services.db.prepare(`
              INSERT INTO outbox (mesh_slug, op_type, payload, client_op_id, created_at)
              VALUES (?, ?, ?, ?, ?)
            `).run(
              meshSlug,
              `${table}.backfill`,
              JSON.stringify(row),
              uuidv7(),
              Date.now(),
            );
          }
        })();

        cursor = rows[rows.length - 1].rowid;
        return false;
      });

      if (done) break;
    }
  }
}

Each chunk is a separate transaction (typically 200 rows × ~5 inserts = 1000 statements, well under 100ms). Between chunks, other writes can interleave via the queue.

9.3 Large mesh snapshot protocol

For meshes with >10k rows, use a server-side snapshot:

POST /api/my/meshes/:slug/snapshot/begin         → { snapshot_id }
POST /api/my/meshes/:slug/snapshot/:id/chunk     → { next_cursor }
POST /api/my/meshes/:slug/snapshot/:id/commit    → { broker_epoch, broker_seq_start }

The CLI uploads rows in chunks keyed by snapshot_id. If the upload is interrupted, the next attempt reads the last cursor and resumes. The server commits atomically; partial uploads never become visible.

9.4 Failure modes

Phase Failure Recovery
1 (auth) User denies in browser Abort publish, local mesh unchanged
2 (register) API 409 (slug collision) CLI suggests a suffix, retries with new slug
3 (local transition) Crash Restart detects kind = shared_owner with empty outbox → resumes phase 4
4 (backfill) Crash mid-chunk Chunk transactions are atomic; resume from last committed rowid
5 (wait) Timeout Publish is logically complete; user sees "Published, sync catching up"
6 (invite) API error Mesh is published; user runs claudemesh invite explicitly

All phases are resumable because each phase's state is durable before the next phase begins.


10. Task claim semantics and audit events

10.1 Local claim

export function taskClaim(
  db: Database,
  queue: WriteQueue,
  input: { mesh: string; task_id: string; peer: string },
): Promise<void> {
  return queue.enqueue(() => {
    db.transaction(() => {
      const task = db.prepare('SELECT * FROM tasks WHERE id = ? AND mesh_slug = ?').get(input.task_id, input.mesh) as any;
      if (!task || task.tombstone) throw new Error('task not found');
      if (task.status === 'completed') throw new Error('task already completed');
      if (task.status === 'cancelled') throw new Error('task cancelled');
      if (task.status === 'claimed' && task.claimed_by !== input.peer) {
        throw new Error(`task already claimed by ${task.claimed_by}`);
      }

      const lamport = tickLamport(db, input.mesh);
      const now = Date.now();

      db.prepare(`
        UPDATE tasks
        SET status = 'claimed', claimed_by = ?, claimed_at = ?, updated_at = ?, lamport = ?
        WHERE id = ?
      `).run(input.peer, now, now, lamport, input.task_id);

      db.prepare(`
        INSERT INTO task_claim_events
          (mesh_slug, task_id, peer_id, event_type, lamport, event_time, applied_at)
        VALUES (?, ?, ?, 'claimed', ?, ?, ?)
      `).run(input.mesh, input.task_id, input.peer, lamport, now, now);

      db.prepare(`
        INSERT INTO outbox (mesh_slug, op_type, payload, client_op_id, created_at)
        VALUES (?, 'task.claim', ?, ?, ?)
      `).run(
        input.mesh,
        JSON.stringify({ task_id: input.task_id, peer_id: input.peer, lamport, event_time: now }),
        uuidv7(),
        now,
      );
    })();
  });
}

10.2 Inbound claim reconciliation — all branches covered

export function applyInboxClaim(
  db: Database,
  op: {
    mesh_slug: string;
    task_id: string;
    peer_id: string;
    lamport: number;
    event_time: number;
  },
): void {
  db.transaction(() => {
    const local = db.prepare('SELECT * FROM tasks WHERE id = ?').get(op.task_id) as any;
    if (!local || local.tombstone) return;

    // Advance the lamport clock per the invariant
    const newLamport = tickLamport(db, op.mesh_slug, op.lamport);

    // Branch on local status
    if (local.status === 'completed' || local.status === 'cancelled') {
      // Terminal states — audit the late claim as rejected, do not mutate the task.
      // Event type is 'rejected_terminal' (not 'superseded') because the incoming
      // claim wasn't beaten by another concurrent claim — it arrived after the
      // task was already done.
      db.prepare(`
        INSERT INTO task_claim_events
          (mesh_slug, task_id, peer_id, event_type, lamport, event_time, applied_at)
        VALUES (?, ?, ?, 'rejected_terminal', ?, ?, ?)
      `).run(op.mesh_slug, op.task_id, op.peer_id, newLamport, op.event_time, Date.now());
      return;
    }

    if (local.status === 'open') {
      // No conflict, apply the claim
      db.prepare(`
        UPDATE tasks
        SET status = 'claimed', claimed_by = ?, claimed_at = ?, updated_at = ?, lamport = ?
        WHERE id = ?
      `).run(op.peer_id, op.event_time, Date.now(), newLamport, op.task_id);

      db.prepare(`
        INSERT INTO task_claim_events
          (mesh_slug, task_id, peer_id, event_type, lamport, event_time, applied_at)
        VALUES (?, ?, ?, 'claimed', ?, ?, ?)
      `).run(op.mesh_slug, op.task_id, op.peer_id, newLamport, op.event_time, Date.now());
      return;
    }

    // Claimed locally by someone — possibly a conflict
    if (local.claimed_by === op.peer_id) {
      // Same peer re-claiming (idempotent) — bump lamport only if higher
      if (op.lamport > local.lamport) {
        db.prepare('UPDATE tasks SET lamport = ? WHERE id = ?').run(newLamport, op.task_id);
      }
      return;
    }

    // Different peer trying to claim
    const localWinsTuple = aWins(
      { lamport: local.lamport, peer_id: local.claimed_by },
      { lamport: op.lamport, peer_id: op.peer_id },
    );

    if (localWinsTuple) {
      // Our claim wins — log the incoming as superseded
      db.prepare(`
        INSERT INTO task_claim_events
          (mesh_slug, task_id, peer_id, event_type, lamport, event_time, applied_at, conflict_peer_id, conflict_lamport)
        VALUES (?, ?, ?, 'superseded', ?, ?, ?, ?, ?)
      `).run(op.mesh_slug, op.task_id, op.peer_id, newLamport, op.event_time, Date.now(), local.claimed_by, local.lamport);
    } else {
      // Incoming wins — supersede our claim
      db.prepare(`
        UPDATE tasks
        SET claimed_by = ?, claimed_at = ?, updated_at = ?, lamport = ?
        WHERE id = ?
      `).run(op.peer_id, op.event_time, Date.now(), newLamport, op.task_id);

      db.prepare(`
        INSERT INTO task_claim_events
          (mesh_slug, task_id, peer_id, event_type, lamport, event_time, applied_at, conflict_peer_id, conflict_lamport)
        VALUES (?, ?, ?, 'superseded', ?, ?, ?, ?, ?)
      `).run(op.mesh_slug, op.task_id, local.claimed_by, newLamport, op.event_time, Date.now(), op.peer_id, op.lamport);

      // Push notification to the local peer whose claim was superseded
      if (local.claimed_by === currentPeerId(op.mesh_slug)) {
        pushNotification({
          type: 'task_claim_superseded',
          task_id: op.task_id,
          by_peer: op.peer_id,
        });
      }
    }
  })();
}

Note the four branches: completed/cancelled (terminal, log only), open (apply), same-peer reclaim (idempotent), different-peer conflict (resolve via tuple comparison). The original spec missed the terminal-state branch.

10.3 MCP notification

When a local claim is superseded, subsequent tool calls by the affected agent include a warnings field:

{
  "ok": true,
  "data": { /* tool result */ },
  "warnings": [
    {
      "type": "task_claim_superseded",
      "task_id": "abc123",
      "by_peer": "bob",
      "at_lamport": 42
    }
  ]
}

Claude Code renders the warning in the TUI so agents don't silently redo work.


11. Single-writer concurrency model

11.1 The rule

All writes go through one queue. Reads can use separate connections. No "database is locked" errors because only one writer holds the write lock at any time.

11.2 Queue implementation with async awareness

The original implementation didn't await the op result, meaning a Promise could slip through and the queue would mark "done" before the operation completed. Fixed version:

// services/store/write-queue.ts

type WriteOp<T> = () => T | Promise<T>;

interface QueueItem<T> {
  op: WriteOp<T>;
  resolve: (v: T) => void;
  reject: (e: Error) => void;
  signal?: AbortSignal;
}

export class WriteQueue {
  private queue: QueueItem<any>[] = [];
  private running = false;
  // State machine: 'open' → 'stopping' → 'stopped'
  // All transitions are guarded by the single JS event loop (no actual mutex
  // needed because Node/Bun are single-threaded for user code), but we use
  // this state field as the source of truth and check it atomically in each
  // method relative to when control returns to user code.
  private state: 'open' | 'stopping' | 'stopped' = 'open';

  constructor(private db: Database) {}

  async enqueue<T>(op: WriteOp<T>, signal?: AbortSignal): Promise<T> {
    // Read-and-act must happen in a single synchronous block — no awaits
    // between the check and the push. JS single-threading guarantees this:
    // no other code can run between these two statements.
    if (this.state !== 'open') {
      throw new Error(`write queue is ${this.state}`);
    }
    if (signal?.aborted) throw new Error('aborted');
    return new Promise<T>((resolve, reject) => {
      this.queue.push({ op, resolve, reject, signal });
      void this.drain();
    });
  }

  private async drain(): Promise<void> {
    if (this.running) return;
    this.running = true;
    try {
      while (this.queue.length > 0) {
        const item = this.queue.shift()!;
        if (item.signal?.aborted) {
          item.reject(new Error('aborted'));
          continue;
        }
        try {
          // await handles both sync and async ops correctly: sync returns
          // resolve immediately through the microtask queue, async returns
          // wait for the Promise to settle before proceeding.
          const result = await item.op();
          item.resolve(result);
        } catch (err) {
          item.reject(err as Error);
        }
      }
    } finally {
      this.running = false;
    }
  }

  /**
   * Begin shutdown. New enqueues are rejected immediately. Existing items
   * drain to completion. Returns when all queued items have been processed.
   *
   * Race-free: setting state='stopping' is atomic relative to enqueue()'s
   * state check because JS is single-threaded. No enqueue can sneak an item
   * past the check after stop() sets state='stopping'.
   */
  async stop(): Promise<void> {
    if (this.state !== 'open') return;
    this.state = 'stopping';
    // Wait for the drain to finish processing all queued items
    while (this.running || this.queue.length > 0) {
      await new Promise(r => setTimeout(r, 10));
    }
    this.state = 'stopped';
  }

  /**
   * Cancel all pending items immediately. Used on SIGKILL-style shutdown.
   */
  abort(): void {
    if (this.state === 'stopped') return;
    this.state = 'stopped';
    const pending = this.queue.splice(0);
    for (const item of pending) {
      item.reject(new Error('aborted'));
    }
  }
}

Race-freedom rationale: The JS event loop guarantees that enqueue()'s state check (if (this.state !== 'open')) and the subsequent this.queue.push() execute atomically — no other code can run between them. When stop() sets this.state = 'stopping', any subsequent enqueue() call sees the updated state synchronously and rejects. There is no TOCTOU window because JS does not preempt synchronous code.

The one subtlety: if enqueue() is called from an async function and has already passed its state check before stop() is called, the item is in the queue and will be drained. That's correct behavior — the caller's await enqueue(...) will resolve normally. If stop() wants to drop in-flight items, it uses abort() instead.

Critical fix: await item.op() instead of const result = item.op(). If op returns a Promise, the queue now waits for it to settle before starting the next item. Ops that return synchronous values (via better-sqlite3) resolve immediately through the Promise machinery.

Event loop impact: the while loop yields between items only if the op returns a Promise. Synchronous ops block the event loop briefly (typically <5ms per op). For large batches this is acceptable because backfill is split into chunks (§9.2).

11.3 PRAGMA settings

db.pragma('journal_mode = WAL');
db.pragma('synchronous = NORMAL');
db.pragma('busy_timeout = 5000');
db.pragma('foreign_keys = ON');
db.pragma('temp_store = MEMORY');
db.pragma('mmap_size = 30000000');
db.pragma('cache_size = -8000');  // 8MB page cache

Durability tradeoff: synchronous = NORMAL means the last committed transaction can be lost on power failure (not crash — SQLite WAL protects against process crash). This is acceptable for claudemesh because the broker replay can recover any lost ops from the server side. For users who want higher durability, synchronous = FULL is available via CLAUDEMESH_STORE_SYNC=full env var at the cost of ~2x write latency.

11.4 Transaction length

Every write transaction MUST complete in < 100ms. Long operations (backfill, GC sweep, re-embedding) are split into many small transactions (§9.2).


12. Sync protocol

12.1 Overview

┌──────────┐       ┌──────────┐       ┌────────────┐
│  Tool    │ write │  SQLite  │ read  │    Sync    │
│  Handler ├──────►│  (source ├──────►│   Daemon   │
└──────────┘       │ of truth)│       └──────┬─────┘
                   └────▲─────┘              │
                        │                    │ outbox →
                        │ apply              ▼ broker ws
                   ┌────┴─────┐       ┌────────────┐
                   │  inbox   │◄──────┤   Broker   │
                   └──────────┘  ← ws └────────────┘

12.2 Outbox drain with abort semantics and head-of-line protection

The original drain blocked the whole batch on a single flaky op. Fixed version aborts the batch on network-level errors and retries only op-specific errors:

// services/broker/sync-daemon.ts

async function drainOutbox(services: Services, meshSlug: string): Promise<DrainResult> {
  const MAX_BATCH = 10;
  const MAX_ATTEMPTS_PER_OP = 10;

  // Read pending ops outside the write queue (read-only)
  const pending = services.db.prepare(`
    SELECT id, op_type, payload, client_op_id, attempts
    FROM outbox
    WHERE mesh_slug = ? AND synced_at IS NULL
    ORDER BY id
    LIMIT ?
  `).all(meshSlug, MAX_BATCH) as OutboxRow[];

  if (pending.length === 0) return { sent: 0, exhausted: false };

  let sent = 0;
  for (const op of pending) {
    // Re-read attempts to avoid stale in-memory value
    const current = services.db.prepare('SELECT attempts FROM outbox WHERE id = ?').get(op.id) as { attempts: number };
    if (current.attempts >= MAX_ATTEMPTS_PER_OP) {
      // Mark mesh as sync_paused and surface to user
      await services.queue.enqueue(() => {
        services.db.prepare('UPDATE mesh SET sync_paused = 1 WHERE slug = ?').run(meshSlug);
      });
      return { sent, exhausted: true };
    }

    try {
      const ack = await services.broker.send({
        mesh_slug: meshSlug,
        client_op_id: op.client_op_id,
        op_type: op.op_type,
        payload: JSON.parse(op.payload),
      });

      await services.queue.enqueue(() => {
        services.db.prepare(`
          UPDATE outbox
          SET synced_at = ?, server_ack_id = ?, broker_epoch = ?, broker_seq = ?
          WHERE id = ?
        `).run(Date.now(), ack.server_ack_id, ack.broker_epoch, ack.broker_seq, op.id);
      });
      sent++;
    } catch (err: any) {
      // Increment attempts in DB
      await services.queue.enqueue(() => {
        services.db.prepare(`
          UPDATE outbox
          SET attempts = attempts + 1, last_error = ?, last_attempt_at = ?
          WHERE id = ?
        `).run(String(err?.message ?? err), Date.now(), op.id);
      });

      // Classify the error
      if (isNetworkError(err)) {
        // Network error: abort the batch, let the daemon loop retry after backoff
        return { sent, exhausted: false, networkError: true };
      }
      // Op-specific error: continue with next op in batch
      continue;
    }
  }

  return { sent, exhausted: false };
}

function isNetworkError(err: any): boolean {
  if (!err) return false;
  const code = err.code ?? err.cause?.code;
  return code === 'ENOTFOUND' || code === 'ECONNREFUSED' || code === 'ECONNRESET' ||
         code === 'ETIMEDOUT' || code === 'EAI_AGAIN' || code === 'WS_CLOSED';
}

12.3 Inbox apply

async function applyInbox(services: Services, meshSlug: string): Promise<ApplyResult> {
  const pending = services.db.prepare(`
    SELECT id, broker_epoch, broker_seq, op_type, payload
    FROM inbox
    WHERE mesh_slug = ? AND applied_at IS NULL
    ORDER BY broker_epoch, broker_seq
    LIMIT 10
  `).all(meshSlug) as InboxRow[];

  if (pending.length === 0) return { applied: 0 };

  let applied = 0;
  for (const inc of pending) {
    try {
      await services.queue.enqueue(() => {
        services.db.transaction(() => {
          applyOp(services.db, meshSlug, inc);
          services.db.prepare('UPDATE inbox SET applied_at = ? WHERE id = ?').run(Date.now(), inc.id);
        })();
      });
      applied++;
    } catch (err) {
      services.logger.error('inbox apply failed', { id: inc.id, err });
      // Stop on first apply failure; retry on next daemon tick
      break;
    }
  }
  return { applied };
}

function applyOp(db: Database, meshSlug: string, inc: InboxRow): void {
  const payload = JSON.parse(inc.payload);
  switch (inc.op_type) {
    case 'memory.set': return upsertMemory(db, { ...payload, mesh_slug: meshSlug });
    case 'memory.tombstone': return tombstoneMemory(db, { ...payload, mesh_slug: meshSlug });
    case 'state.set': return upsertStateKv(db, { ...payload, mesh_slug: meshSlug });
    case 'task.claim': return applyInboxClaim(db, { ...payload, mesh_slug: meshSlug });
    case 'vector.store': return applyInboxVectorStore(db, { ...payload, mesh_slug: meshSlug });
    case 'file.share': return applyInboxFileShare(db, { ...payload, mesh_slug: meshSlug });
    // ... etc
    default:
      throw new Error(`unknown op_type: ${inc.op_type}`);
  }
}

12.4 Daemon loop — idle path applies inbox

Critical fix: the idle path now applies inbox, not just drains outbox. Remote messages no longer starve.

export class SyncDaemon {
  private state: 'active' | 'idle' | 'reconnecting' | 'stopped' = 'idle';
  private idleSleepMs = 5_000;
  private activeSleepMs = 500;
  private reconnectBackoff = 1_000;
  private stopPromise: Promise<void> | null = null;
  private stopResolve: (() => void) | null = null;

  constructor(private services: Services) {}

  async start(): Promise<void> {
    this.stopPromise = new Promise(resolve => { this.stopResolve = resolve; });

    while ((this.state as string) !== 'stopped') {
      try {
        if (this.state === 'reconnecting') {
          try {
            await this.services.broker.connect();
            this.state = 'active';
            this.reconnectBackoff = 1_000;
          } catch {
            await sleep(this.reconnectBackoff);
            this.reconnectBackoff = Math.min(this.reconnectBackoff * 2, 30_000);
            continue;
          }
        }

        // Apply inbound ops FIRST, regardless of state (prevents starvation)
        for (const meshSlug of await this.getActiveMeshes()) {
          await applyInbox(this.services, meshSlug);
        }

        // Then drain outbound
        let anyNetworkError = false;
        for (const meshSlug of await this.getActiveMeshes()) {
          const result = await drainOutbox(this.services, meshSlug);
          if (result.networkError) {
            anyNetworkError = true;
            break;
          }
        }

        if (anyNetworkError) {
          this.state = 'reconnecting';
          continue;
        }

        // State transition: active → idle if nothing happened for 30s
        const now = Date.now();
        const lastActivity = this.services.broker.lastActivityAt ?? 0;
        if (this.state === 'active' && now - lastActivity > 30_000) {
          this.state = 'idle';
        }

        await sleep(this.state === 'active' ? this.activeSleepMs : this.idleSleepMs);
      } catch (err) {
        this.services.logger.error('sync daemon loop error', { err });
        await sleep(1_000);
      }
    }

    this.stopResolve!();
  }

  /** Trigger immediate drain on local change. */
  onLocalChange(): void {
    this.state = 'active';
  }

  /** Trigger immediate apply on incoming broker message. */
  onBrokerMessage(): void {
    this.state = 'active';
  }

  /** Graceful shutdown. */
  async stop(): Promise<void> {
    this.state = 'stopped';
    if (this.stopPromise) await this.stopPromise;
    await this.services.queue.stop();
  }
}

State transitions:

  • reconnectingactive on successful connect
  • activeidle after 30s of broker silence AND empty outbox
  • idleactive on local change or broker message
  • Any → stopped on stop() call

Critical properties:

  • Inbox is applied on every tick, regardless of state
  • Outbox is drained on every tick (idle has a longer sleep)
  • Network errors transition to reconnecting with backoff
  • Stop is awaitable and drains in-flight ops

13. Conflict resolution per tool family

Tool Strategy Tiebreaker
memory LWW per (mesh, peer, key) (lamport, peer_id) bytewise
state_kv LWW per (mesh, key) (lamport, updated_by) bytewise
vectors Append-only per (mesh, peer, key, model) Tombstone on delete, no conflict
files LWW per (mesh, peer, path) (lamport, peer_id) bytewise; content dedup by sha256
tasks First claim wins (lamport, peer_id) bytewise; supersession events logged
peers Last broker update wins Cache only, no local writes

14. Offline behavior

Operation Offline result
remember Succeeds, enqueues outbox op
recall Succeeds from local
vector_store Succeeds, enqueues outbox op
vector_search Succeeds from local vectors
set_state Succeeds, enqueues outbox op
get_state Succeeds from local
share_file Succeeds, content to local blob store, metadata enqueues
read_peer_file Returns { status: 'stale', content: last_known } or { status: 'offline' } if never synced
list_peers Returns cached list with stale: true flag after 5 min
send_message Returns { status: 'queued' }, goes to outbox
claim_task Tentative claim, reverts on reconnect if another peer won
mesh_clock Returns { lamport, sync_state: 'offline', last_sync_at }
mesh_info Returns local metadata

15. Error recovery

15.1 Corrupt database

On startup:

const result = db.pragma('integrity_check', { simple: true });
if (result !== 'ok') {
  // Surface to user with `claudemesh doctor --repair` offer
  // Repair: .dump → new db → re-import
  // If repair fails: backup to data.db.corrupt-<timestamp> + init fresh
}

15.2 Stuck outbox

Per-op retry limit is 10 (checked against the current DB value, not stale in-memory). When exhausted:

  1. Set mesh.sync_paused = 1
  2. Surface warning overlay in UI
  3. claudemesh doctor shows the failing ops and offers --retry or --drop

15.3 Diverged inbox

If inbox has a gap in (broker_epoch, broker_seq):

  1. Request re-sync from the last known seq
  2. If broker returns a new epoch, accept it (broker restarted)
  3. If gap persists, mark mesh as "needs full resync" and re-download from snapshot

15.4 Broker epoch change

Detected when the broker ack includes a new broker_epoch. The CLI:

  1. Updates mesh.broker_epoch in the DB
  2. Continues with new epoch for all subsequent ops
  3. Inbox dedupe still works because the unique constraint is (mesh, epoch, seq)

15.5 Migration failure

Migrations are transactional and atomic. If a migration fails mid-run:

  • _migrations table is updated per migration's commit
  • Restart retries from the last successful migration
  • If a migration keeps failing, claudemesh doctor --rollback-migration <version> offers an escape

16. Migration runner

// services/store/migrations.ts

interface Migration {
  version: number;
  name: string;
  up: (db: Database) => void;
}

const MIGRATIONS: Migration[] = [
  {
    version: 1,
    name: '001-initial',
    up: (db) => { db.exec(readSqlFile('001-initial.sql')); },
  },
  {
    version: 2,
    name: '002-add-broker-epoch',
    up: (db) => { db.exec(readSqlFile('002-add-broker-epoch.sql')); },
  },
  // ...
];

export function runMigrations(db: Database, queue: WriteQueue): Promise<void> {
  return queue.enqueue(() => {
    db.exec('CREATE TABLE IF NOT EXISTS _migrations (version INTEGER PRIMARY KEY, applied_at INTEGER NOT NULL)');
    const applied = db.prepare('SELECT version FROM _migrations').all() as { version: number }[];
    const appliedVersions = new Set(applied.map(r => r.version));

    for (const m of MIGRATIONS) {
      if (appliedVersions.has(m.version)) continue;
      db.transaction(() => {
        m.up(db);
        db.prepare('INSERT INTO _migrations (version, applied_at) VALUES (?, ?)').run(m.version, Date.now());
      })();
    }
  });
}

17. Bundle size accounting (honest)

Per review: the 800 KB JS bundle target was optimistic. Honest targets:

17.1 Per-platform distribution

Platform Native addon size JS bundle (gz) Total install (decompressed)
macOS arm64 ~2.8 MB ~1.0 MB ~8-10 MB
macOS x64 ~2.9 MB ~1.0 MB ~8-10 MB
Linux x64 ~3.2 MB ~1.0 MB ~9-11 MB
Linux arm64 ~3.1 MB ~1.0 MB ~9-11 MB
Windows x64 ~3.5 MB ~1.0 MB ~10-12 MB

JS bundle target: ~1 MB gzipped (not 800 KB). Realistic given Ink + React + Zod + citty + MCP SDK + all UI code.

Cold start target: 200-400 ms (not 100 ms). better-sqlite3 native addon load + SQLite init + connection pragmas takes 150-250 ms on modern hardware. Script evaluation adds another 50-150 ms.

17.2 Cold start phases

Phase Target Notes
Node startup + script load <50 ms Bun or Node + ESM loader
better-sqlite3 native load ~100-150 ms One-time per process
sqlite-vec extension load ~20-50 ms One-time per connection
SQLite connection + PRAGMA ~30-80 ms Includes WAL checkpoint check
Migration check (cached) <10 ms Only runs if version mismatch
First meaningful output 200-400 ms total Measured on Apple M2 Pro, 2026

17.3 Optimization path

If cold start exceeds 400 ms in practice:

  • Defer non-critical service initialization (telemetry, update check)
  • Use bun runtime as alternate distribution (Bun's native SQLite skips addon load)
  • Lazy-load MCP tool registrations

None of these are required for v1.0.0.


18. Shutdown and drain protocol

18.1 Signal handling

// services/lifecycle/service-manager.ts

export class ServiceManager {
  private services: {
    queue: WriteQueue;
    daemon: SyncDaemon;
    broker: BrokerClient;
  };

  async shutdown(): Promise<void> {
    // Order matters:
    // 1. Stop accepting new work
    await this.services.daemon.stop();
    // 2. Drain any queued writes
    await this.services.queue.stop();
    // 3. Close broker connection
    await this.services.broker.disconnect();
    // 4. GC sweep if time permits (best effort)
    try {
      await Promise.race([
        gcBlobs(this.services.db, this.services.queue, BLOBS_DIR),
        sleep(2_000),
      ]);
    } catch {}
    // 5. Checkpoint WAL
    this.services.db.pragma('wal_checkpoint(TRUNCATE)');
    // 6. Close DB
    this.services.db.close();
  }
}

// Entrypoint wiring:
process.on('SIGINT', async () => {
  await serviceManager.shutdown();
  process.exit(0);
});
process.on('SIGTERM', async () => {
  await serviceManager.shutdown();
  process.exit(0);
});

18.2 Timeout

Shutdown has a 10-second hard timeout. If services don't stop cleanly within that window, process.exit(1) is called and the user sees a warning on next launch:

~ Previous session didn't shut down cleanly. Running integrity check…

The integrity check verifies the database is uncorrupted. If WAL replay succeeds, the warning is cleared.


19. Testing strategy

19.1 Unit tests

Every module in services/store/ has a colocated *.test.ts with 100% coverage. Uses better-sqlite3 with :memory: database.

Required unit tests:

  • tickLamport concurrency: 100 simultaneous calls, assert monotonic output with no gaps or duplicates
  • upsertMemory conflict resolution: interleave writes with different lamports, assert winner is correct per tuple comparison
  • applyInboxClaim all 4 branches: completed, cancelled, open, same-peer reclaim, different-peer conflict
  • WriteQueue async op handling: enqueue async function, assert await enqueue(...) resolves after the Promise settles
  • ensureVecTable race: two simulated processes race to create the same fingerprint, assert only one vec table is created
  • Path validation: 50+ positive and negative cases

19.2 Integration tests

tests/integration/store/ runs against a real staging broker + real file system. Covers:

  • Full sync protocol end-to-end
  • Conflict resolution between two simulated peers with clock skew
  • Publish upgrade transaction with backfill
  • Offline → reconnect → converge with 1000 pending ops
  • Task claim race with explicit reconciliation
  • Broker epoch change mid-session

19.3 Fuzz tests

tests/fuzz/store/ generates random op sequences and verifies invariants:

  • Lamport is monotonic within a peer
  • Conflict resolution is deterministic (same input → same output across runs)
  • Outbox + inbox round-trip produces identical state on both peers
  • Blob refcount never goes negative
  • No orphaned blobs after GC sweep

Fuzz budget: 100,000 random operations per CI run.

19.4 Benchmarks

tests/bench/store/ tracks regression:

  • Memory insert latency p50/p99
  • Vector search latency
  • Transaction throughput under single-writer contention
  • Cold start
  • Bundle size (fail if >20% regression)

20. Operational concerns

20.1 Backups

claudemesh doctor --backup produces a clean snapshot via SQLite's BACKUP API. Users can also manually copy data.db + data.db-wal + data.db-shm if the process is stopped.

20.2 Export

claudemesh advanced export --format jsonl dumps all mesh data to JSONL for debugging or manual migration.

20.3 Import

claudemesh advanced import <file.jsonl> is NOT implemented in v1.0.0. Importing rows with arbitrary lamports would break invariant §4.1. Deferred to v1.1 with a proper re-stamping pass.

20.4 Metrics

Local metrics log to ~/.claudemesh/logs/metrics.jsonl:

  • Operation counts and latencies per tool
  • Sync lag (local lamport vs last applied inbox lamport)
  • Error rates by category
  • Cold start time per launch

Read by claudemesh doctor for diagnosis. Never transmitted externally (even if telemetry is opted in).


21. Open questions deferred to v1.1+

  1. Hybrid logical clocks — if field experience shows Lamport is insufficient for certain workloads
  2. Selective sync — allow users to exclude certain meshes or tables from sync
  3. Row-level encryption — even the broker can't read content
  4. CRDT structures — if append-only patterns dominate, move memory/state to Automerge-style
  5. Multi-machine personal mesh sync — server-side encrypted storage of personal meshes
  6. SQLite encryption at rest — SQLCipher adds ~4 MB; consider as claudemesh-cli-sqlcipher alternate distribution
  7. Time-series queries on memory — "what did I remember 3 days ago" requires additional indexing
  8. Incremental vector re-embedding — current flow is one big expensive operation
  9. Import support — with safe re-stamping

End of spec.