Files
fireclaw/src/agent-manager.ts

503 lines
13 KiB
TypeScript

import {
existsSync,
mkdirSync,
readFileSync,
writeFileSync,
copyFileSync,
unlinkSync,
readdirSync,
} from "node:fs";
import { join } from "node:path";
import { execFileSync } from "node:child_process";
import { CONFIG } from "./config.js";
const SSH_OPTS = [
"-o", "StrictHostKeyChecking=no",
"-o", "UserKnownHostsFile=/dev/null",
"-o", "ConnectTimeout=5",
"-i", CONFIG.sshKeyPath,
];
import {
allocateIp,
releaseIp,
deleteTap,
applyNetworkPolicy,
removeNetworkPolicy,
type NetworkPolicy,
} from "./network.js";
import {
setupNetwork,
spawnFirecracker,
bootVM,
} from "./firecracker-vm.js";
export interface AgentInfo {
name: string;
nick: string;
model: string;
template: string;
ip: string;
octet: number;
tapDevice: string;
socketPath: string;
rootfsPath: string;
pid: number;
startedAt: string;
}
interface AgentTemplate {
name: string;
nick: string;
model: string;
trigger: string;
persona: string;
network?: NetworkPolicy;
}
const AGENTS_FILE = join(CONFIG.baseDir, "agents.json");
const TEMPLATES_DIR = join(CONFIG.baseDir, "templates");
const AGENT_ROOTFS = join(CONFIG.baseDir, "agent-rootfs.ext4");
const WORKSPACES_DIR = CONFIG.workspacesDir;
function log(msg: string) {
process.stderr.write(`[agent-mgr] ${msg}\n`);
}
function loadAgents(): Record<string, AgentInfo> {
try {
return JSON.parse(readFileSync(AGENTS_FILE, "utf-8"));
} catch {
return {};
}
}
function saveAgents(agents: Record<string, AgentInfo>) {
writeFileSync(AGENTS_FILE, JSON.stringify(agents, null, 2));
}
export function loadTemplate(name: string): AgentTemplate {
const path = join(TEMPLATES_DIR, `${name}.json`);
if (!existsSync(path)) {
throw new Error(`Template "${name}" not found at ${path}`);
}
return JSON.parse(readFileSync(path, "utf-8"));
}
export function listTemplates(): string[] {
try {
return readdirSync(TEMPLATES_DIR)
.filter((f) => f.endsWith(".json"))
.map((f) => f.replace(".json", ""));
} catch {
return [];
}
}
function injectAgentConfig(
rootfsPath: string,
config: { nick: string; model: string; trigger: string },
persona: string
) {
const mountPoint = `/tmp/fireclaw-agent-${Date.now()}`;
mkdirSync(mountPoint, { recursive: true });
try {
execFileSync("sudo", ["mount", "-o", "loop", rootfsPath, mountPoint], {
stdio: "pipe",
});
execFileSync(
"sudo",
["mkdir", "-p", join(mountPoint, "etc/agent")],
{ stdio: "pipe" }
);
// Write config (via stdin to avoid shell injection)
const configJson = JSON.stringify({
nick: config.nick,
model: config.model,
trigger: config.trigger,
server: "172.16.0.1",
port: 6667,
ollama_url: "http://172.16.0.1:11434",
});
const configPath = join(mountPoint, "etc/agent/config.json");
execFileSync("sudo", ["tee", configPath], {
input: configJson,
stdio: ["pipe", "pipe", "pipe"],
});
// Write persona (via stdin to avoid shell injection)
const personaPath = join(mountPoint, "etc/agent/persona.md");
execFileSync("sudo", ["tee", personaPath], {
input: persona,
stdio: ["pipe", "pipe", "pipe"],
});
// Inject SSH key for debugging access
execFileSync("sudo", ["mkdir", "-p", join(mountPoint, "root/.ssh")], {
stdio: "pipe",
});
if (existsSync(CONFIG.sshPubKeyPath)) {
execFileSync(
"sudo",
[
"cp",
CONFIG.sshPubKeyPath,
join(mountPoint, "root/.ssh/authorized_keys"),
],
{ stdio: "pipe" }
);
execFileSync(
"sudo",
["chmod", "600", join(mountPoint, "root/.ssh/authorized_keys")],
{ stdio: "pipe" }
);
}
} finally {
try {
execFileSync("sudo", ["umount", mountPoint], { stdio: "pipe" });
} catch {}
try {
execFileSync("rmdir", [mountPoint], { stdio: "pipe" });
} catch {}
}
}
function ensureWorkspace(agentName: string): string {
mkdirSync(WORKSPACES_DIR, { recursive: true });
const imgPath = join(WORKSPACES_DIR, `${agentName}.ext4`);
if (!existsSync(imgPath)) {
log(`Creating workspace for "${agentName}" (${CONFIG.workspaceSizeMib} MiB)...`);
execFileSync("truncate", ["-s", `${CONFIG.workspaceSizeMib}M`, imgPath], {
stdio: "pipe",
});
execFileSync("sudo", ["/usr/sbin/mkfs.ext4", "-q", imgPath], {
stdio: "pipe",
});
// Seed with MEMORY.md template
const mountPoint = `/tmp/fireclaw-ws-${Date.now()}`;
mkdirSync(mountPoint, { recursive: true });
try {
execFileSync("sudo", ["mount", "-o", "loop", imgPath, mountPoint], {
stdio: "pipe",
});
execFileSync(
"sudo",
["bash", "-c", `mkdir -p ${mountPoint}/memory && echo '# Agent Memory' > ${mountPoint}/MEMORY.md`],
{ stdio: "pipe" }
);
execFileSync("sudo", ["chown", "-R", "0:0", mountPoint], {
stdio: "pipe",
});
} finally {
try { execFileSync("sudo", ["umount", mountPoint], { stdio: "pipe" }); } catch {}
try { execFileSync("rmdir", [mountPoint], { stdio: "pipe" }); } catch {}
}
}
return imgPath;
}
export async function startAgent(
templateName: string,
overrides?: { name?: string; model?: string }
): Promise<AgentInfo> {
if (!existsSync(AGENT_ROOTFS)) {
throw new Error(
`Agent rootfs not found at ${AGENT_ROOTFS}. Build it first.`
);
}
const template = loadTemplate(templateName);
const name = overrides?.name ?? template.name;
const nick = overrides?.name ?? template.nick;
const model = overrides?.model ?? template.model;
// Check not already running
const agents = loadAgents();
if (agents[name]) {
throw new Error(`Agent "${name}" is already running`);
}
log(`Starting agent "${name}" (template: ${templateName})...`);
// Allocate resources
const { ip, octet } = allocateIp();
const tapDevice = `fctap${octet}`;
const socketPath = join(CONFIG.socketDir, `agent-${name}.sock`);
const rootfsPath = join(CONFIG.runsDir, `agent-${name}.ext4`);
mkdirSync(CONFIG.socketDir, { recursive: true });
mkdirSync(CONFIG.runsDir, { recursive: true });
// Clean stale socket from previous run
try { unlinkSync(socketPath); } catch {}
// Prepare rootfs
copyFileSync(AGENT_ROOTFS, rootfsPath);
injectAgentConfig(
rootfsPath,
{ nick, model, trigger: template.trigger },
template.persona
);
// Create/get persistent workspace
const workspacePath = ensureWorkspace(name);
// Setup network
setupNetwork(tapDevice);
// Boot VM
const proc = await spawnFirecracker(socketPath, { detached: true });
await bootVM({
socketPath,
rootfsPath,
extraDrives: [{ id: "workspace", path: workspacePath }],
tapDevice,
ip,
octet,
});
// Apply network policy
const networkPolicy: NetworkPolicy = template.network ?? "full";
applyNetworkPolicy(ip, networkPolicy);
const info: AgentInfo = {
name,
nick,
model,
template: templateName,
ip,
octet,
tapDevice,
socketPath,
rootfsPath,
pid: proc.pid!,
startedAt: new Date().toISOString(),
};
// Re-read agents.json before writing to avoid race conditions
// (another startAgent may have written since we last read)
const currentAgents = loadAgents();
currentAgents[name] = info;
saveAgents(currentAgents);
log(`Agent "${name}" started: nick=${nick} ip=${ip}`);
return info;
}
export async function stopAgent(name: string) {
const agents = loadAgents();
const info = agents[name];
if (!info) {
throw new Error(`Agent "${name}" is not running`);
}
log(`Stopping agent "${name}"...`);
// Graceful shutdown: SSH in and kill the agent process so it sends IRC QUIT
try {
execFileSync(
"ssh",
[...SSH_OPTS, `root@${info.ip}`, "pkill -f 'agent.py' 2>/dev/null; sleep 1"],
{ stdio: "pipe", timeout: 5_000 }
);
} catch {
// Best effort — VM might already be unreachable
}
// Kill firecracker process and wait for it to die
try {
process.kill(info.pid, "SIGKILL");
// Wait for process to actually exit before cleaning up resources
for (let i = 0; i < 20; i++) {
try {
process.kill(info.pid, 0); // Check if alive
await new Promise((r) => setTimeout(r, 200));
} catch {
break; // Process is gone
}
}
} catch {
// Already dead
}
// Small delay to let kernel release the tap device
await new Promise((r) => setTimeout(r, 500));
// Cleanup
try {
unlinkSync(info.socketPath);
} catch {}
removeNetworkPolicy(info.ip);
for (let attempt = 0; attempt < 3; attempt++) {
try {
deleteTap(info.tapDevice);
break;
} catch {
if (attempt < 2) await new Promise((r) => setTimeout(r, 1000));
}
}
releaseIp(info.octet);
try {
unlinkSync(info.rootfsPath);
} catch {}
delete agents[name];
saveAgents(agents);
log(`Agent "${name}" stopped.`);
}
export function listAgents(): AgentInfo[] {
const agents = loadAgents();
// Verify processes are still alive
for (const [name, info] of Object.entries(agents)) {
try {
process.kill(info.pid, 0);
} catch {
// Process is dead, clean up
log(`Agent "${name}" is dead, cleaning up...`);
try { deleteTap(info.tapDevice); } catch (e) { log(` tap cleanup: ${e}`); }
try { releaseIp(info.octet); } catch (e) { log(` ip cleanup: ${e}`); }
try { unlinkSync(info.rootfsPath); } catch (e) { log(` rootfs cleanup: ${e}`); }
try { unlinkSync(info.socketPath); } catch (e) { log(` socket cleanup: ${e}`); }
delete agents[name];
}
}
saveAgents(agents);
return Object.values(agents);
}
export async function reloadAgent(
name: string,
updates: { model?: string; persona?: string; trigger?: string }
) {
const agents = loadAgents();
const info = agents[name];
if (!info) {
throw new Error(`Agent "${name}" is not running`);
}
log(`Reloading agent "${name}"...`);
// Build updated config
const configUpdates: Record<string, string> = {};
if (updates.model) {
configUpdates.model = updates.model;
info.model = updates.model;
}
if (updates.trigger) configUpdates.trigger = updates.trigger;
const sshTarget = `root@${info.ip}`;
try {
if (Object.keys(configUpdates).length > 0) {
// Read current config from VM
const currentRaw = execFileSync(
"ssh",
[...SSH_OPTS, sshTarget, "cat /etc/agent/config.json"],
{ encoding: "utf-8", timeout: 10_000 }
);
const current = JSON.parse(currentRaw);
Object.assign(current, configUpdates);
const newConfig = JSON.stringify(current);
// Write back via stdin
execFileSync(
"ssh",
[...SSH_OPTS, sshTarget, `cat > /etc/agent/config.json`],
{ input: newConfig, timeout: 10_000 }
);
}
if (updates.persona) {
execFileSync(
"ssh",
[...SSH_OPTS, sshTarget, `cat > /etc/agent/persona.md`],
{ input: updates.persona, timeout: 10_000 }
);
}
// Signal agent to reload
execFileSync(
"ssh",
[...SSH_OPTS, sshTarget, "pkill -HUP -f 'agent.py'"],
{ stdio: "pipe", timeout: 10_000 }
);
} catch (err) {
throw new Error(`Failed to reload agent: ${err}`);
}
saveAgents(agents);
log(`Agent "${name}" reloaded.`);
}
export function reconcileAgents(): { adopted: string[]; cleaned: string[] } {
const agents = loadAgents();
const adopted: string[] = [];
const cleaned: string[] = [];
for (const [name, info] of Object.entries(agents)) {
let alive = false;
try {
process.kill(info.pid, 0);
alive = true;
} catch {
// Process is dead
}
if (alive) {
adopted.push(name);
log(`Adopted running agent "${name}" (PID ${info.pid}, ${info.ip})`);
} else {
log(`Cleaning dead agent "${name}" (PID ${info.pid} gone)...`);
try { deleteTap(info.tapDevice); } catch (e) { log(` tap: ${e}`); }
try { releaseIp(info.octet); } catch (e) { log(` ip: ${e}`); }
try { unlinkSync(info.rootfsPath); } catch (e) { log(` rootfs: ${e}`); }
try { unlinkSync(info.socketPath); } catch (e) { log(` socket: ${e}`); }
delete agents[name];
cleaned.push(name);
}
}
// Scan for orphan firecracker processes not in agents.json
try {
const psOutput = execFileSync("pgrep", ["-a", "firecracker"], {
encoding: "utf-8",
});
for (const line of psOutput.trim().split("\n")) {
if (!line) continue;
const match = line.match(/agent-(\S+)\.sock/);
if (match) {
const agentName = match[1];
if (!agents[agentName]) {
const pid = parseInt(line.split(/\s+/)[0]);
log(`Found orphan firecracker process for "${agentName}" (PID ${pid}), killing...`);
try { process.kill(pid, "SIGKILL"); } catch {}
cleaned.push(`orphan:${agentName}`);
}
}
}
} catch {
// No firecracker processes running — that's fine
}
saveAgents(agents);
if (adopted.length === 0 && cleaned.length === 0) {
log("No agents to reconcile.");
} else {
log(`Reconciled: ${adopted.length} adopted, ${cleaned.length} cleaned.`);
}
return { adopted, cleaned };
}
export async function stopAllAgents() {
const agents = loadAgents();
for (const name of Object.keys(agents)) {
await stopAgent(name);
}
}