chore(broker): typecheck clean (77 → 0)

paid down the broker's accumulated type debt. zero behavioral changes,
purely type-system tightening:

- broker.ts: row extraction helper for postgres-js result vs pg shape;
  findMemberByPubkey defaultGroups null-coalescing.
- env.ts: zod default ordered before transform (zod v4 ordering).
- index.ts: typed JSON.parse for the tg/token, upload-auth, file-upload,
  member patch and mesh-settings handlers; export SelfEditablePolicy
  from member-api; added bodyVersion to WSSendMessage; added the
  disconnect/kick/ban/unban/list_bans message types to WSClientMessage;
  String(key) cast for neo4j record symbol-typed keys.
- jwt.ts, paths.ts, telegram-token.ts: typed JSON.parse results.
- service-manager.ts: typed package.json + MCP JSON-RPC reader.
- telegram-bridge.ts: typed WS message handler; missing log import;
  null-tolerant BridgeRow + skip rows missing memberId/displayName;
  typed e in catch.
- types.ts: bodyVersion on WSSendMessage, manifest on WSSkillData,
  five new admin message types (kick/disconnect/ban/unban/list_bans).
- packages/db/server.ts: drizzle constructor positional args + scoped
  ts-expect-error for the namespace-bag schema generic mismatch.

apps/broker/src/types.ts will eventually want a real audit pass to
catch every WS verb and surface the orphans, but this clears the path
for 1.30.0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-05-04 13:22:09 +01:00
parent 6d981976c0
commit f013436541
11 changed files with 82 additions and 40 deletions

View File

@@ -1013,7 +1013,7 @@ export async function topicHistory(args: {
ORDER BY tm.created_at DESC, tm.id DESC
LIMIT ${limit}
`);
const rows = (result.rows ?? result) as Array<{
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{
id: string;
sender_member_id: string;
sender_pubkey: string;
@@ -1442,7 +1442,7 @@ export async function recallMemory(
ORDER BY ts_rank(search_vector, plainto_tsquery('english', ${query})) DESC
LIMIT 20
`);
const rows = (result.rows ?? result) as Array<{
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{
id: string;
content: string;
tags: string[];
@@ -2010,7 +2010,7 @@ export async function getContext(
ORDER BY updated_at DESC
LIMIT 20
`);
const rows = (result.rows ?? result) as Array<{
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{
peer_name: string | null;
summary: string;
files_read: string[] | null;
@@ -2419,7 +2419,7 @@ export async function drainForMember(
SELECT * FROM claimed ORDER BY created_at ASC, id ASC
`);
const rows = (result.rows ?? result) as Array<{
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{
id: string;
priority: string;
nonce: string;
@@ -2665,7 +2665,11 @@ export async function findMemberByPubkey(
),
)
.limit(1);
return row ?? null;
if (!row) return null;
return {
...row,
defaultGroups: row.defaultGroups ?? [],
};
}
// --- Mesh databases (per-mesh PostgreSQL schemas) ---
@@ -2719,7 +2723,7 @@ export async function meshQuery(
sql.raw(`SET LOCAL search_path TO "${schema}"`)
);
const result = await tx.execute(sql.raw(query));
const rows = (result.rows ?? []) as Array<Record<string, unknown>>;
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<Record<string, unknown>>;
const columns = rows.length > 0 ? Object.keys(rows[0]!) : [];
return { columns, rows, rowCount: rows.length };
});
@@ -2762,7 +2766,7 @@ export async function meshSchema(
WHERE table_schema = ${schema}
ORDER BY table_name, ordinal_position
`);
const rows = (result.rows ?? result) as Array<{
const rows = ((result as unknown as { rows?: unknown[] }).rows ?? (result as unknown as unknown[])) as Array<{
table_name: string;
column_name: string;
data_type: string;

View File

@@ -23,7 +23,7 @@ const envSchema = z.object({
MINIO_ENDPOINT: z.string().default("minio:9000"),
MINIO_ACCESS_KEY: z.string().default("claudemesh"),
MINIO_SECRET_KEY: z.string().default("changeme"),
MINIO_USE_SSL: z.enum(["true", "false", ""]).transform(v => v === "true").default("false"),
MINIO_USE_SSL: z.enum(["true", "false", ""]).default("false").transform(v => v === "true"),
QDRANT_URL: z.string().default("http://qdrant:6333"),
NEO4J_URL: z.string().default("bolt://neo4j:7687"),
NEO4J_USER: z.string().default("neo4j"),

View File

@@ -22,7 +22,7 @@ import { invite as inviteTable, mesh, meshMember, messageQueue, presence, schedu
import { user } from "@turbostarter/db/schema/auth";
import { handleCliSync, type CliSyncRequest } from "./cli-sync";
import { generateId } from "@turbostarter/shared/utils";
import { updateMemberProfile, listMeshMembers, updateMeshSettings } from "./member-api";
import { updateMemberProfile, listMeshMembers, updateMeshSettings, type MemberUpdateRequest, type SelfEditablePolicy } from "./member-api";
import {
claimTask,
completeTask,
@@ -831,7 +831,12 @@ function handleHttpRequest(req: IncomingMessage, res: ServerResponse): void {
req.on("data", (c: Buffer) => chunks.push(c));
req.on("end", () => {
try {
const body = JSON.parse(Buffer.concat(chunks).toString());
const body = JSON.parse(Buffer.concat(chunks).toString()) as {
meshId?: string;
memberId?: string;
pubkey?: string;
secretKey?: string;
};
const { meshId: tgMeshId, memberId: tgMemberId, pubkey: tgPubkey, secretKey: tgSecretKey } = body;
if (!tgMeshId || !tgMemberId || !tgPubkey || !tgSecretKey) {
writeJson(res, 400, { error: "meshId, memberId, pubkey, secretKey required" });
@@ -1099,7 +1104,7 @@ function handleInviteClaimV2Post(
const raw = Buffer.concat(chunks).toString();
let payload: { recipient_x25519_pubkey?: string; display_name?: string };
try {
payload = JSON.parse(raw);
payload = JSON.parse(raw) as { recipient_x25519_pubkey?: string; display_name?: string };
} catch {
writeJson(res, 400, { error: "malformed" });
return;
@@ -1197,7 +1202,7 @@ async function handleUploadPost(
let tags: string[] = [];
if (tagsRaw) {
try {
tags = JSON.parse(tagsRaw);
tags = JSON.parse(tagsRaw) as string[];
} catch {
tags = [];
}
@@ -1259,7 +1264,7 @@ async function handleUploadPost(
let fileKeys: Array<{ peerPubkey: string; sealedKey: string }> = [];
if (encrypted && fileKeysRaw) {
try {
fileKeys = JSON.parse(fileKeysRaw);
fileKeys = JSON.parse(fileKeysRaw) as Array<{ peerPubkey: string; sealedKey: string }>;
} catch { /* ignore */ }
}
@@ -1364,7 +1369,7 @@ function handleMemberPatchPost(req: IncomingMessage, res: ServerResponse, meshId
req.on("end", async () => {
if (aborted) return;
try {
const body = JSON.parse(Buffer.concat(chunks).toString());
const body = JSON.parse(Buffer.concat(chunks).toString()) as MemberUpdateRequest;
// Auth: callerMemberId from X-Member-Id header (dashboard or CLI provides this)
const callerMemberId = req.headers["x-member-id"] as string | undefined;
if (!callerMemberId) { writeJson(res, 401, { ok: false, error: "X-Member-Id header required" }); return; }
@@ -1407,7 +1412,7 @@ function handleMeshSettingsPatch(req: IncomingMessage, res: ServerResponse, mesh
req.on("end", async () => {
if (aborted) return;
try {
const body = JSON.parse(Buffer.concat(chunks).toString());
const body = JSON.parse(Buffer.concat(chunks).toString()) as { selfEditable?: SelfEditablePolicy };
const callerMemberId = req.headers["x-member-id"] as string | undefined;
if (!callerMemberId) { writeJson(res, 401, { ok: false, error: "X-Member-Id header required" }); return; }
const result = await updateMeshSettings(meshId, callerMemberId, body);
@@ -3753,7 +3758,7 @@ function handleConnection(ws: WebSocket): void {
const gqRecords = gqResult.records.map((r) => {
const obj: Record<string, unknown> = {};
for (const key of r.keys) {
obj[key] = r.get(key);
obj[String(key)] = r.get(key);
}
return obj;
});
@@ -3788,7 +3793,7 @@ function handleConnection(ws: WebSocket): void {
const geRecords = geResult.records.map((r) => {
const obj: Record<string, unknown> = {};
for (const key of r.keys) {
obj[key] = r.get(key);
obj[String(key)] = r.get(key);
}
return obj;
});
@@ -3877,10 +3882,10 @@ function handleConnection(ws: WebSocket): void {
const [peers, stateEntries, memCount, fileCount, taskCounts, streams, tables] = await Promise.all([
listPeersInMesh(conn.meshId),
listState(conn.meshId),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.memory WHERE mesh_id = ${conn.meshId} AND forgotten_at IS NULL`).then(r => Number(((r.rows ?? r) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.file WHERE mesh_id = ${conn.meshId} AND deleted_at IS NULL`).then(r => Number(((r.rows ?? r) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.memory WHERE mesh_id = ${conn.meshId} AND forgotten_at IS NULL`).then(r => Number((((r as unknown as { rows?: unknown[] }).rows ?? (r as unknown as unknown[])) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT COUNT(*) as n FROM mesh.file WHERE mesh_id = ${conn.meshId} AND deleted_at IS NULL`).then(r => Number((((r as unknown as { rows?: unknown[] }).rows ?? (r as unknown as unknown[])) as any[])[0]?.n ?? 0)),
db.execute(sql`SELECT status, COUNT(*) as n FROM mesh.task WHERE mesh_id = ${conn.meshId} GROUP BY status`).then(r => {
const rows = (r.rows ?? r) as Array<{ status: string; n: string }>;
const rows = (((r as unknown as { rows?: unknown[] }).rows ?? (r as unknown as unknown[]))) as Array<{ status: string; n: string }>;
const counts = { open: 0, claimed: 0, done: 0 };
for (const row of rows) counts[row.status as keyof typeof counts] = Number(row.n);
return counts;

View File

@@ -86,7 +86,7 @@ export async function verifySyncToken(
}
// Decode header — must be HS256
const header = JSON.parse(new TextDecoder().decode(base64UrlDecode(headerB64)));
const header = JSON.parse(new TextDecoder().decode(base64UrlDecode(headerB64))) as { alg?: string };
if (header.alg !== "HS256") {
return { ok: false, error: `unsupported algorithm: ${header.alg}` };
}

View File

@@ -31,7 +31,7 @@ export interface MemberPermissionUpdate {
export type MemberUpdateRequest = MemberProfileUpdate & MemberPermissionUpdate;
interface SelfEditablePolicy {
export interface SelfEditablePolicy {
displayName: boolean;
roleTag: boolean;
groups: boolean;

View File

@@ -115,11 +115,11 @@ function lastAssistantHasToolUse(filePath: string): boolean {
if (!line) continue;
if (!line.includes('"assistant"')) continue;
try {
const d = JSON.parse(line);
const d = JSON.parse(line) as { type?: string; message?: { content?: unknown } };
if (d.type !== "assistant") continue;
const content = d.message?.content;
if (!Array.isArray(content)) continue;
return content.some((c: { type?: string }) => c.type === "tool_use");
return (content as Array<{ type?: string }>).some((c) => c.type === "tool_use");
} catch {
/* malformed line, skip */
}

View File

@@ -169,7 +169,7 @@ function detectEntry(
try {
const pkg = JSON.parse(
readFileSync(join(sourcePath, "package.json"), "utf-8"),
);
) as { main?: string; bin?: string | Record<string, string> };
if (pkg.main) return { command: cmd, args: [pkg.main] };
if (pkg.bin) {
const bin =
@@ -372,7 +372,7 @@ function spawnService(svc: ManagedService): void {
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line) => {
try {
const msg = JSON.parse(line);
const msg = JSON.parse(line) as { id?: string | number; error?: { message?: string }; result?: unknown };
if (msg.id && svc.pendingCalls.has(String(msg.id))) {
const pending = svc.pendingCalls.get(String(msg.id))!;
clearTimeout(pending.timer);

View File

@@ -13,6 +13,7 @@ import { Bot, InputFile } from "grammy";
import WebSocket from "ws";
import sodium from "libsodium-wrappers";
import { validateTelegramConnectToken } from "./telegram-token";
import { log } from "./logger";
// ---------------------------------------------------------------------------
// Types
@@ -22,11 +23,12 @@ export interface BridgeRow {
chatId: number;
meshId: string;
meshSlug?: string;
memberId: string;
/** memberId can be null until the bridge claims a mesh.member row. */
memberId: string | null;
pubkey: string;
secretKey: string;
displayName: string;
chatType: string;
displayName: string | null;
chatType: string | null;
chatTitle: string | null;
}
@@ -228,7 +230,7 @@ class MeshConnection {
ws.on("message", async (raw) => {
try {
const msg = JSON.parse(raw.toString());
const msg = JSON.parse(raw.toString()) as Record<string, any>;
if (msg.type === "hello_ack") {
clearTimeout(helloTimeout);
@@ -674,8 +676,8 @@ function createPushHandler(bot: Bot) {
for (const chatId of chatIds) {
bot.api
.sendMessage(chatId, formatted)
.catch((e) => {
console.error(`[tg-bridge] send to chat ${chatId} failed:`, e.message);
.catch((e: unknown) => {
console.error(`[tg-bridge] send to chat ${chatId} failed:`, e instanceof Error ? e.message : String(e));
});
}
};
@@ -1729,11 +1731,12 @@ async function executeAiToolCall(
for (const meshId of meshIds) {
const services = await listDbMeshServices(meshId);
for (const s of services) {
const sx = s as Record<string, unknown>;
allServices.push({
name: s.name,
type: s.type ?? "mcp",
tools: s.tool_count ?? 0,
status: s.status ?? "running",
name: String(sx.name ?? ""),
type: String(sx.type ?? "mcp"),
tools: Number(sx.tool_count ?? 0),
status: String(sx.status ?? "running"),
});
}
}
@@ -1841,6 +1844,9 @@ export async function bootTelegramBridge(
for (const [meshId, meshRows] of byMesh) {
const first = meshRows[0]!;
try {
// memberId/displayName come back from DB nullable; bridge only
// works once both are populated, so skip rows missing either.
if (!first.memberId || !first.displayName) continue;
await ensureMeshConnection(
{
meshId,

View File

@@ -102,11 +102,11 @@ export function validateTelegramConnectToken(
if (!timingSafeEqual(a, b)) return null;
// Verify header algorithm
const header = JSON.parse(base64urlDecode(headerB64));
const header = JSON.parse(base64urlDecode(headerB64)) as { alg?: string };
if (header.alg !== "HS256") return null;
// Decode and validate claims
const claims: JwtClaims = JSON.parse(base64urlDecode(payloadB64));
const claims = JSON.parse(base64urlDecode(payloadB64)) as JwtClaims;
// Check subject
if (claims.sub !== "telegram-connect") return null;

View File

@@ -170,6 +170,10 @@ export interface WSSendMessage {
* Server validates same-topic membership; FK is set null if parent
* later disappears. Ignored for non-topic targets. */
replyToId?: string;
/** Optional ciphertext-format version. 1 = v1 plaintext base64;
* 2 = v0.3.0 phase 3 per-topic encrypted body. Server passes this
* through verbatim into topic_message.body_version. */
bodyVersion?: number;
}
/** Broker → client: an envelope addressed to this peer. */
@@ -1390,6 +1394,16 @@ export interface WSVaultGetMessage { type: "vault_get"; keys: string[]; _reqId?:
export interface WSWatchMessage { type: "watch"; url: string; mode?: "hash" | "json" | "status"; extract?: string; interval?: number; notify_on?: string; headers?: Record<string, string>; label?: string; _reqId?: string; }
/** Client → broker: stop watching. */
export interface WSUnwatchMessage { type: "unwatch"; watchId: string; _reqId?: string; }
/** Client → broker: soft-disconnect a peer (1000; CLI auto-reconnects). */
export interface WSDisconnectMessage { type: "disconnect"; target?: string; stale?: number; all?: boolean; _reqId?: string; }
/** Client → broker: hard-kick a peer (4001; CLI exits). */
export interface WSKickMessage { type: "kick"; target?: string; stale?: number; all?: boolean; _reqId?: string; }
/** Client → broker: ban a member by pubkey or display name. */
export interface WSBanMessage { type: "ban"; target: string; reason?: string; _reqId?: string; }
/** Client → broker: lift a ban. */
export interface WSUnbanMessage { type: "unban"; target: string; _reqId?: string; }
/** Client → broker: list active bans on the caller's mesh. */
export interface WSListBansMessage { type: "list_bans"; _reqId?: string; }
/** Client → broker: list active watches. */
export interface WSWatchListMessage { type: "watch_list"; _reqId?: string; }
/** Broker → client: watch created acknowledgement. */
@@ -1494,7 +1508,12 @@ export type WSClientMessage =
| WSVaultGetMessage
| WSWatchMessage
| WSUnwatchMessage
| WSWatchListMessage;
| WSWatchListMessage
| WSDisconnectMessage
| WSKickMessage
| WSBanMessage
| WSUnbanMessage
| WSListBansMessage;
// --- Skill messages ---
@@ -1546,6 +1565,8 @@ export interface WSSkillDataMessage {
instructions: string;
tags: string[];
author: string;
/** Optional opaque metadata stored alongside the skill body. */
manifest?: unknown;
createdAt: string;
} | null;
_reqId?: string;