feat(cli+broker): kick, ban, unban, bans commands
Broker WS handlers: - kick: disconnect peer(s) by name, --stale duration, or --all. Authz: owner or admin only. Closes WS + marks presence disconnected. - ban: kick + set revokedAt on mesh.member. Hello already rejects revoked members, so ban is instant and permanent until unban. - unban: clear revokedAt. Peer can rejoin with their existing keypair. - list_bans: return all revoked members for a mesh. Session-id dedup (previous commit): handleHello disconnects ghost presences with matching (meshId, sessionId) before inserting the new one. Eliminates duplicate entries after broker restarts. CLI (alpha.37): - claudemesh kick <peer|--stale 30m|--all> - claudemesh ban/unban <peer> - claudemesh bans [--json] - Uses new sendAndWait() on ws-client for request-response pattern over WS (generic _reqId resolver). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -18,7 +18,7 @@ import { WebSocketServer, type WebSocket } from "ws";
|
||||
import { and, eq, inArray, isNull, lt, sql } from "drizzle-orm";
|
||||
import { env } from "./env";
|
||||
import { db } from "./db";
|
||||
import { invite as inviteTable, mesh, meshMember, messageQueue, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
|
||||
import { invite as inviteTable, mesh, meshMember, messageQueue, presence, scheduledMessage as scheduledMessageTable, meshWebhook, peerState } from "@turbostarter/db/schema/mesh";
|
||||
import { user } from "@turbostarter/db/schema/auth";
|
||||
import { handleCliSync, type CliSyncRequest } from "./cli-sync";
|
||||
import { generateId } from "@turbostarter/shared/utils";
|
||||
@@ -3898,6 +3898,154 @@ function handleConnection(ws: WebSocket): void {
|
||||
break;
|
||||
}
|
||||
|
||||
// --- Kick / Ban / Unban ---
|
||||
|
||||
case "kick": {
|
||||
const km = msg as { type: "kick"; target?: string; stale?: number; all?: boolean; _reqId?: string };
|
||||
// Authz: only owner or admin can kick.
|
||||
const [kickMesh] = await db.select({ ownerUserId: mesh.ownerUserId }).from(mesh).where(eq(mesh.id, conn.meshId)).limit(1);
|
||||
const [kickMember] = await db.select({ role: meshMember.role, userId: meshMember.userId }).from(meshMember).where(eq(meshMember.id, conn.memberId)).limit(1);
|
||||
if (!kickMesh || (kickMesh.ownerUserId !== kickMember?.userId && kickMember?.role !== "admin")) {
|
||||
sendError(ws, "forbidden", "only owner or admin can kick", undefined, km._reqId);
|
||||
break;
|
||||
}
|
||||
|
||||
const kicked: string[] = [];
|
||||
const now = Date.now();
|
||||
|
||||
if (km.all) {
|
||||
// Kick everyone except caller
|
||||
for (const [pid, peer] of connections) {
|
||||
if (peer.meshId !== conn.meshId || pid === presenceId) continue;
|
||||
try { peer.ws.close(1000, "kicked"); } catch {}
|
||||
connections.delete(pid);
|
||||
void disconnectPresence(pid);
|
||||
kicked.push(peer.displayName || pid);
|
||||
}
|
||||
} else if (km.stale && typeof km.stale === "number") {
|
||||
// Kick peers idle longer than stale ms
|
||||
const cutoff = now - km.stale;
|
||||
for (const [pid, peer] of connections) {
|
||||
if (peer.meshId !== conn.meshId || pid === presenceId) continue;
|
||||
// Check last_ping_at from DB for accurate staleness
|
||||
const [pres] = await db.select({ lastPingAt: presence.lastPingAt }).from(presence).where(eq(presence.id, pid)).limit(1);
|
||||
if (pres && pres.lastPingAt && pres.lastPingAt.getTime() < cutoff) {
|
||||
try { peer.ws.close(1000, "kicked_stale"); } catch {}
|
||||
connections.delete(pid);
|
||||
void disconnectPresence(pid);
|
||||
kicked.push(peer.displayName || pid);
|
||||
}
|
||||
}
|
||||
} else if (km.target) {
|
||||
// Kick specific peer by name or pubkey
|
||||
for (const [pid, peer] of connections) {
|
||||
if (peer.meshId !== conn.meshId) continue;
|
||||
if (peer.displayName === km.target || peer.memberPubkey === km.target || peer.memberPubkey.startsWith(km.target)) {
|
||||
try { peer.ws.close(1000, "kicked"); } catch {}
|
||||
connections.delete(pid);
|
||||
void disconnectPresence(pid);
|
||||
kicked.push(peer.displayName || pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conn.ws.send(JSON.stringify({ type: "kick_ack", kicked, _reqId: km._reqId }));
|
||||
log.info("ws kick", { presence_id: presenceId, kicked_count: kicked.length, target: km.target ?? km.stale ?? "all" });
|
||||
break;
|
||||
}
|
||||
|
||||
case "ban": {
|
||||
const bm = msg as { type: "ban"; target: string; _reqId?: string };
|
||||
if (!bm.target) { sendError(ws, "invalid", "target required", undefined, bm._reqId); break; }
|
||||
|
||||
// Authz: only owner or admin
|
||||
const [banMesh] = await db.select({ ownerUserId: mesh.ownerUserId }).from(mesh).where(eq(mesh.id, conn.meshId)).limit(1);
|
||||
const [banMember] = await db.select({ role: meshMember.role, userId: meshMember.userId }).from(meshMember).where(eq(meshMember.id, conn.memberId)).limit(1);
|
||||
if (!banMesh || (banMesh.ownerUserId !== banMember?.userId && banMember?.role !== "admin")) {
|
||||
sendError(ws, "forbidden", "only owner or admin can ban", undefined, bm._reqId);
|
||||
break;
|
||||
}
|
||||
|
||||
// Find member by name or pubkey
|
||||
const [targetMember] = await db.select({ id: meshMember.id, displayName: meshMember.displayName, peerPubkey: meshMember.peerPubkey })
|
||||
.from(meshMember)
|
||||
.where(and(
|
||||
eq(meshMember.meshId, conn.meshId),
|
||||
isNull(meshMember.revokedAt),
|
||||
sql`(${meshMember.displayName} = ${bm.target} OR ${meshMember.peerPubkey} = ${bm.target} OR LEFT(${meshMember.peerPubkey}, ${bm.target.length}) = ${bm.target})`,
|
||||
))
|
||||
.limit(1);
|
||||
|
||||
if (!targetMember) { sendError(ws, "not_found", `peer "${bm.target}" not found`, undefined, bm._reqId); break; }
|
||||
if (targetMember.id === conn.memberId) { sendError(ws, "invalid", "cannot ban yourself", undefined, bm._reqId); break; }
|
||||
|
||||
// Revoke member
|
||||
await db.update(meshMember).set({ revokedAt: new Date() }).where(eq(meshMember.id, targetMember.id));
|
||||
|
||||
// Kick all their connections
|
||||
for (const [pid, peer] of connections) {
|
||||
if (peer.meshId === conn.meshId && peer.memberPubkey === targetMember.peerPubkey) {
|
||||
try { peer.ws.close(1000, "banned"); } catch {}
|
||||
connections.delete(pid);
|
||||
void disconnectPresence(pid);
|
||||
}
|
||||
}
|
||||
|
||||
void audit(conn.meshId, "member_banned", conn.memberId, conn.displayName, { target: targetMember.displayName, targetPubkey: targetMember.peerPubkey });
|
||||
conn.ws.send(JSON.stringify({ type: "ban_ack", banned: targetMember.displayName, _reqId: bm._reqId }));
|
||||
log.info("ws ban", { presence_id: presenceId, banned: targetMember.displayName, banned_member_id: targetMember.id });
|
||||
break;
|
||||
}
|
||||
|
||||
case "unban": {
|
||||
const ubm = msg as { type: "unban"; target: string; _reqId?: string };
|
||||
if (!ubm.target) { sendError(ws, "invalid", "target required", undefined, ubm._reqId); break; }
|
||||
|
||||
// Authz
|
||||
const [unbanMesh] = await db.select({ ownerUserId: mesh.ownerUserId }).from(mesh).where(eq(mesh.id, conn.meshId)).limit(1);
|
||||
const [unbanMember] = await db.select({ role: meshMember.role, userId: meshMember.userId }).from(meshMember).where(eq(meshMember.id, conn.memberId)).limit(1);
|
||||
if (!unbanMesh || (unbanMesh.ownerUserId !== unbanMember?.userId && unbanMember?.role !== "admin")) {
|
||||
sendError(ws, "forbidden", "only owner or admin can unban", undefined, ubm._reqId);
|
||||
break;
|
||||
}
|
||||
|
||||
// Find revoked member
|
||||
const [revokedMember] = await db.select({ id: meshMember.id, displayName: meshMember.displayName })
|
||||
.from(meshMember)
|
||||
.where(and(
|
||||
eq(meshMember.meshId, conn.meshId),
|
||||
sql`${meshMember.revokedAt} IS NOT NULL`,
|
||||
sql`(${meshMember.displayName} = ${ubm.target} OR ${meshMember.peerPubkey} = ${ubm.target})`,
|
||||
))
|
||||
.limit(1);
|
||||
|
||||
if (!revokedMember) { sendError(ws, "not_found", `no banned peer "${ubm.target}"`, undefined, ubm._reqId); break; }
|
||||
|
||||
await db.update(meshMember).set({ revokedAt: null }).where(eq(meshMember.id, revokedMember.id));
|
||||
void audit(conn.meshId, "member_unbanned", conn.memberId, conn.displayName, { target: revokedMember.displayName });
|
||||
conn.ws.send(JSON.stringify({ type: "unban_ack", unbanned: revokedMember.displayName, _reqId: ubm._reqId }));
|
||||
log.info("ws unban", { presence_id: presenceId, unbanned: revokedMember.displayName });
|
||||
break;
|
||||
}
|
||||
|
||||
case "list_bans": {
|
||||
const lbm = msg as { type: "list_bans"; _reqId?: string };
|
||||
const banned = await db.select({
|
||||
name: meshMember.displayName,
|
||||
pubkey: meshMember.peerPubkey,
|
||||
revokedAt: meshMember.revokedAt,
|
||||
}).from(meshMember).where(and(
|
||||
eq(meshMember.meshId, conn.meshId),
|
||||
sql`${meshMember.revokedAt} IS NOT NULL`,
|
||||
));
|
||||
conn.ws.send(JSON.stringify({
|
||||
type: "list_bans_result",
|
||||
bans: banned.map((b) => ({ name: b.name, pubkey: b.pubkey, revokedAt: b.revokedAt?.toISOString() })),
|
||||
_reqId: lbm._reqId,
|
||||
}));
|
||||
break;
|
||||
}
|
||||
|
||||
// --- Webhook CRUD ---
|
||||
case "create_webhook": {
|
||||
const cw = msg as Extract<WSClientMessage, { type: "create_webhook" }>;
|
||||
|
||||
Reference in New Issue
Block a user