#!/usr/bin/env python3 """Fireclaw IRC agent — connects to IRC, responds via Ollama with discoverable skills.""" import os import re import socket import json import sys import time import subprocess import urllib.request import urllib.error import signal import threading from collections import deque # Load config with open("/etc/agent/config.json") as f: CONFIG = json.load(f) PERSONA = "" try: with open("/etc/agent/persona.md") as f: PERSONA = f.read().strip() except FileNotFoundError: PERSONA = "You are a helpful assistant." NICK = CONFIG.get("nick", "agent") SERVER = CONFIG.get("server", "172.16.0.1") PORT = CONFIG.get("port", 6667) OLLAMA_URL = CONFIG.get("ollama_url", "http://172.16.0.1:11434") CONTEXT_SIZE = CONFIG.get("context_size", 20) MAX_RESPONSE_LINES = CONFIG.get("max_response_lines", 50) TOOLS_ENABLED = CONFIG.get("tools", True) MAX_TOOL_ROUNDS = CONFIG.get("max_tool_rounds", 10) WORKSPACE = "/workspace" SKILL_DIRS = ["/opt/skills", f"{WORKSPACE}/skills"] RUNTIME = { "model": CONFIG.get("model", "qwen2.5-coder:7b"), "trigger": CONFIG.get("trigger", "mention"), "persona": PERSONA, } recent = deque(maxlen=CONTEXT_SIZE) # ─── Memory ────────────────────────────────────────────────────────── AGENT_MEMORY = "" try: with open(f"{WORKSPACE}/MEMORY.md") as f: AGENT_MEMORY = f.read().strip() mem_dir = f"{WORKSPACE}/memory" if os.path.isdir(mem_dir): for fname in sorted(os.listdir(mem_dir)): if fname.endswith(".md"): try: with open(f"{mem_dir}/{fname}") as f: topic = fname.replace(".md", "") AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}" except Exception: pass except FileNotFoundError: pass # ─── Skill System ──────────────────────────────────────────────────── def parse_skill_md(path): """Parse a SKILL.md frontmatter into a tool definition.""" with open(path) as f: content = f.read() # Extract YAML frontmatter between --- match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL) if not match: return None # Simple YAML-like parser (no pyyaml dependency) fm = {} current_key = None current_param = None params = {} for line in match.group(1).split("\n"): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if line.startswith(" ") and current_key == "parameters": # Inside parameters block if line.startswith(" ") and current_param: # Parameter property k, _, v = stripped.partition(":") v = v.strip().strip('"').strip("'") if k.strip() == "required": v = v.lower() == "true" params[current_param][k.strip()] = v elif ":" in stripped: # New parameter param_name = stripped.rstrip(":").strip() current_param = param_name params[param_name] = {} elif ":" in line and not line.startswith(" "): k, _, v = line.partition(":") k = k.strip() v = v.strip().strip('"').strip("'") fm[k] = v current_key = k if k == "parameters": current_param = None if "name" not in fm: return None # Build Ollama tool definition properties = {} required = [] for pname, pdata in params.items(): properties[pname] = { "type": pdata.get("type", "string"), "description": pdata.get("description", ""), } if pdata.get("required", False): required.append(pname) return { "type": "function", "function": { "name": fm["name"], "description": fm.get("description", ""), "parameters": { "type": "object", "properties": properties, "required": required, }, }, } def discover_skills(): """Scan skill directories and return tool definitions + script paths.""" tools = [] scripts = {} for skill_dir in SKILL_DIRS: if not os.path.isdir(skill_dir): continue for name in sorted(os.listdir(skill_dir)): skill_path = os.path.join(skill_dir, name) skill_md = os.path.join(skill_path, "SKILL.md") if not os.path.isfile(skill_md): continue tool_def = parse_skill_md(skill_md) if not tool_def: continue # Find the run script for ext in ("run.py", "run.sh"): script = os.path.join(skill_path, ext) if os.path.isfile(script): scripts[tool_def["function"]["name"]] = script break if tool_def["function"]["name"] in scripts: tools.append(tool_def) return tools, scripts LARGE_OUTPUT_THRESHOLD = 2000 LARGE_OUTPUT_DIR = f"{WORKSPACE}/tool_outputs" _output_counter = 0 def execute_skill(script_path, args): """Execute a skill script with args as JSON on stdin. Large outputs are saved to a file with a preview returned.""" global _output_counter env = os.environ.copy() env["WORKSPACE"] = WORKSPACE env["SEARX_URL"] = CONFIG.get("searx_url", "https://searx.mymx.me") try: result = subprocess.run( ["python3" if script_path.endswith(".py") else "bash", script_path], input=json.dumps(args), capture_output=True, text=True, timeout=120, env=env, ) output = result.stdout if result.stderr: output += f"\n[stderr] {result.stderr}" output = output.strip() or "[no output]" # Large output handling — save to file, return preview if len(output) > LARGE_OUTPUT_THRESHOLD: os.makedirs(LARGE_OUTPUT_DIR, exist_ok=True) _output_counter += 1 filepath = f"{LARGE_OUTPUT_DIR}/output_{_output_counter}.txt" with open(filepath, "w") as f: f.write(output) preview = output[:1500] return f"{preview}\n\n[output truncated — full result ({len(output)} chars) saved to {filepath}. Use run_command to read it: cat {filepath}]" return output except subprocess.TimeoutExpired: return "[skill timed out after 120s]" except Exception as e: return f"[skill error: {e}]" # Discover skills at startup TOOLS, SKILL_SCRIPTS = discover_skills() LOG_FILE = f"{WORKSPACE}/agent.log" if os.path.isdir(WORKSPACE) else None def log(msg): line = f"[{time.strftime('%H:%M:%S')}] {msg}" print(f"[agent:{NICK}] {line}", flush=True) if LOG_FILE: try: with open(LOG_FILE, "a") as f: f.write(line + "\n") except Exception: pass log(f"Loaded {len(TOOLS)} skills: {', '.join(SKILL_SCRIPTS.keys())}") # ─── IRC Client ────────────────────────────────────────────────────── class IRCClient: def __init__(self, server, port, nick): self.server = server self.port = port self.nick = nick self.sock = None self.buf = "" self._lock = threading.Lock() def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(300) self.sock.connect((self.server, self.port)) self.send(f"NICK {self.nick}") self.send(f"USER {self.nick} 0 * :Fireclaw Agent") def send(self, msg): with self._lock: self.sock.sendall(f"{msg}\r\n".encode("utf-8")) def join(self, channel): self.send(f"JOIN {channel}") def say(self, target, text): for line in text.split("\n"): line = line.strip() if line: while len(line) > 380: self.send(f"PRIVMSG {target} :{line[:380]}") line = line[380:] self.send(f"PRIVMSG {target} :{line}") def set_bot_mode(self): self.send(f"MODE {self.nick} +B") def recv_lines(self): try: data = self.sock.recv(4096) except socket.timeout: return [] if not data: raise ConnectionError("Connection closed") self.buf += data.decode("utf-8", errors="replace") lines = self.buf.split("\r\n") self.buf = lines.pop() return lines # ─── Tool Dispatch ─────────────────────────────────────────────────── def try_parse_tool_call(text): """Parse text-based tool calls (model dumps JSON as text).""" text = re.sub(r"", "", text).strip() for start in range(len(text)): if text[start] == "{": for end in range(len(text), start, -1): if text[end - 1] == "}": try: obj = json.loads(text[start:end]) name = obj.get("name") args = obj.get("arguments", {}) if name and isinstance(args, dict): return (name, args) except json.JSONDecodeError: continue return None def dispatch_tool(fn_name, fn_args, round_num): """Execute a tool call via the skill system.""" script = SKILL_SCRIPTS.get(fn_name) if not script: return f"[unknown tool: {fn_name}]" log(f"Skill [{round_num}]: {fn_name}({str(fn_args)[:60]})") # Handle save_memory reload result = execute_skill(script, fn_args) # Reload full memory if save_memory was called if fn_name == "save_memory": reload_memory() return result def reload_memory(): """Reload all memory files from workspace.""" global AGENT_MEMORY AGENT_MEMORY = "" try: with open(f"{WORKSPACE}/MEMORY.md") as f: AGENT_MEMORY = f.read().strip() mem_dir = f"{WORKSPACE}/memory" if os.path.isdir(mem_dir): for fname in sorted(os.listdir(mem_dir)): if fname.endswith(".md"): try: with open(f"{mem_dir}/{fname}") as f: topic = fname.replace(".md", "") AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}" except Exception: pass except FileNotFoundError: pass def ollama_request(payload): data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( f"{OLLAMA_URL}/api/chat", data=data, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read()) def query_ollama(messages): """Call Ollama chat API with skill-based tool support.""" payload = { "model": RUNTIME["model"], "messages": messages, "stream": False, "options": {"num_predict": 512}, } if TOOLS_ENABLED and TOOLS: payload["tools"] = TOOLS for round_num in range(MAX_TOOL_ROUNDS): remaining = MAX_TOOL_ROUNDS - round_num try: data = ollama_request(payload) except (urllib.error.URLError, TimeoutError) as e: return f"[error: {e}]" msg = data.get("message", {}) # Structured tool calls tool_calls = msg.get("tool_calls") if tool_calls: messages.append(msg) for tc in tool_calls: fn = tc.get("function", {}) result = dispatch_tool( fn.get("name", ""), fn.get("arguments", {}), round_num + 1, ) # Warn when budget is running low if remaining <= 2: result += f"\n[warning: {remaining - 1} tool rounds remaining — wrap up]" messages.append({"role": "tool", "content": result}) payload["messages"] = messages continue # Text-based tool calls content = msg.get("content", "").strip() parsed_tool = try_parse_tool_call(content) if parsed_tool: fn_name, fn_args = parsed_tool if fn_name in SKILL_SCRIPTS: messages.append({"role": "assistant", "content": content}) result = dispatch_tool(fn_name, fn_args, round_num + 1) if remaining <= 2: result += f"\n[warning: {remaining - 1} tool rounds remaining — wrap up]" messages.append({ "role": "user", "content": f"Tool result:\n{result}\n\nNow respond to the user based on this result.", }) payload["messages"] = messages continue return content return "[max tool rounds reached]" # ─── Message Handling ──────────────────────────────────────────────── def build_messages(question, channel): system = RUNTIME["persona"] if TOOLS_ENABLED and TOOLS: skill_names = [t["function"]["name"] for t in TOOLS] system += "\n\nYou have access to tools: " + ", ".join(skill_names) + "." system += "\nUse tools when needed rather than guessing. Your workspace at /workspace persists across restarts." if AGENT_MEMORY and AGENT_MEMORY != "# Agent Memory": system += f"\n\nIMPORTANT - Your persistent memory (facts you saved previously, use these to answer questions):\n{AGENT_MEMORY}" system += f"\n\nYou are in IRC channel {channel}. Your nick is {NICK}. Keep responses concise — this is IRC." system += "\nWhen you want to address another agent or user, always start your message with their nick followed by a colon, e.g. 'coder: can you review this?'. This is how IRC mentions work — without the prefix, they won't see your message." messages = [{"role": "system", "content": system}] channel_msgs = [m for m in recent if m["channel"] == channel] for msg in channel_msgs[-CONTEXT_SIZE:]: if msg["nick"] == NICK: messages.append({"role": "assistant", "content": msg["text"]}) else: messages.append({"role": "user", "content": f"<{msg['nick']}> {msg['text']}"}) last = messages[-1] if len(messages) > 1 else None if not last or last.get("role") != "user" or question not in last.get("content", ""): messages.append({"role": "user", "content": question}) return messages def should_trigger(text): if RUNTIME["trigger"] == "all": return True lower = text.lower() nick = NICK.lower() return ( lower.startswith(f"{nick}:") or lower.startswith(f"{nick},") or lower.startswith(f"{nick} ") or lower.startswith(f"@{nick}") or lower == nick or text.startswith("!ask ") ) def extract_question(text): lower = text.lower() for prefix in [ f"{NICK.lower()}: ", f"{NICK.lower()}, ", f"@{NICK.lower()} ", f"{NICK.lower()} ", ]: if lower.startswith(prefix): return text[len(prefix):] if text.startswith("!ask "): return text[5:] return text _last_response_time = 0 _AGENT_COOLDOWN = 10 _cooldown_lock = threading.Lock() def handle_message(irc, source_nick, target, text): global _last_response_time is_dm = not target.startswith("#") channel = source_nick if is_dm else target reply_to = source_nick if is_dm else target recent.append({"nick": source_nick, "text": text, "channel": channel}) if source_nick == NICK: return if not is_dm and not should_trigger(text): return with _cooldown_lock: now = time.time() if now - _last_response_time < _AGENT_COOLDOWN: log(f"Cooldown active, ignoring trigger from {source_nick}") return _last_response_time = now question = extract_question(text) if not is_dm else text log(f"Triggered by {source_nick} in {channel}: {question[:80]}") def do_respond(): try: messages = build_messages(question, channel) response = query_ollama(messages) if not response: return lines = response.split("\n") if len(lines) > MAX_RESPONSE_LINES: lines = lines[:MAX_RESPONSE_LINES] lines.append(f"[truncated, {MAX_RESPONSE_LINES} lines max]") irc.say(reply_to, "\n".join(lines)) recent.append({"nick": NICK, "text": response[:200], "channel": channel}) except Exception as e: log(f"Error handling message: {e}") try: irc.say(reply_to, f"[error: {e}]") except Exception: pass threading.Thread(target=do_respond, daemon=True).start() # ─── Main Loop ─────────────────────────────────────────────────────── def run(): log(f"Starting agent: nick={NICK} model={RUNTIME['model']} tools={TOOLS_ENABLED}") while True: try: irc = IRCClient(SERVER, PORT, NICK) log(f"Connecting to {SERVER}:{PORT}...") irc.connect() def handle_sighup(signum, frame): log("SIGHUP received, reloading config...") try: with open("/etc/agent/config.json") as f: new_config = json.load(f) RUNTIME["model"] = new_config.get("model", RUNTIME["model"]) RUNTIME["trigger"] = new_config.get("trigger", RUNTIME["trigger"]) try: with open("/etc/agent/persona.md") as f: RUNTIME["persona"] = f.read().strip() except FileNotFoundError: pass log(f"Reloaded: model={RUNTIME['model']} trigger={RUNTIME['trigger']}") irc.say("#agents", f"[reloaded: model={RUNTIME['model']}]") except Exception as e: log(f"Reload failed: {e}") signal.signal(signal.SIGHUP, handle_sighup) def handle_sigterm(signum, frame): log("SIGTERM received, quitting IRC...") try: irc.send("QUIT :Agent shutting down") except Exception: pass time.sleep(0.5) sys.exit(0) signal.signal(signal.SIGTERM, handle_sigterm) registered = False while True: lines = irc.recv_lines() for line in lines: if line.startswith("PING"): irc.send(f"PONG {line.split(' ', 1)[1]}") continue parts = line.split(" ") if len(parts) < 2: continue if parts[1] == "001" and not registered: registered = True log("Registered with server") irc.set_bot_mode() irc.join("#agents") log("Joined #agents") if parts[1] == "INVITE" and len(parts) >= 3: invited_channel = parts[-1].lstrip(":") inviter = parts[0].split("!")[0].lstrip(":") log(f"Invited to {invited_channel} by {inviter}, joining...") irc.join(invited_channel) if parts[1] == "PRIVMSG" and len(parts) >= 4: source_nick = parts[0].split("!")[0].lstrip(":") target = parts[2] text = " ".join(parts[3:]).lstrip(":") handle_message(irc, source_nick, target, text) except (ConnectionError, OSError, socket.timeout) as e: log(f"Disconnected: {e}. Reconnecting in 5s...") time.sleep(5) except KeyboardInterrupt: log("Shutting down.") sys.exit(0) if __name__ == "__main__": run()