#!/usr/bin/env python3 """Fireclaw IRC agent — connects to IRC, responds via Ollama with tool access.""" import socket import json import sys import time import subprocess import urllib.request import urllib.error import signal import threading from collections import deque # Load config with open("/etc/agent/config.json") as f: CONFIG = json.load(f) PERSONA = "" try: with open("/etc/agent/persona.md") as f: PERSONA = f.read().strip() except FileNotFoundError: PERSONA = "You are a helpful assistant." NICK = CONFIG.get("nick", "agent") CHANNEL = CONFIG.get("channel", "#agents") SERVER = CONFIG.get("server", "172.16.0.1") PORT = CONFIG.get("port", 6667) OLLAMA_URL = CONFIG.get("ollama_url", "http://172.16.0.1:11434") CONTEXT_SIZE = CONFIG.get("context_size", 20) MAX_RESPONSE_LINES = CONFIG.get("max_response_lines", 50) TOOLS_ENABLED = CONFIG.get("tools", True) MAX_TOOL_ROUNDS = CONFIG.get("max_tool_rounds", 5) WORKSPACE = "/workspace" # Mutable runtime config — can be hot-reloaded via SIGHUP RUNTIME = { "model": CONFIG.get("model", "qwen2.5-coder:7b"), "trigger": CONFIG.get("trigger", "mention"), "persona": PERSONA, } # Recent messages for context recent = deque(maxlen=CONTEXT_SIZE) # Load persistent memory from workspace AGENT_MEMORY = "" try: import os with open(f"{WORKSPACE}/MEMORY.md") as f: AGENT_MEMORY = f.read().strip() # Also load all memory files referenced in the index mem_dir = f"{WORKSPACE}/memory" if os.path.isdir(mem_dir): for fname in sorted(os.listdir(mem_dir)): if fname.endswith(".md"): try: with open(f"{mem_dir}/{fname}") as f: topic = fname.replace(".md", "") AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}" except Exception: pass except FileNotFoundError: pass # Tool definitions for Ollama chat API TOOLS = [ { "type": "function", "function": { "name": "run_command", "description": "Execute a shell command on this system and return the output. Use this to check system info, run scripts, fetch URLs, process data, etc.", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The shell command to execute (bash)", } }, "required": ["command"], }, }, }, { "type": "function", "function": { "name": "save_memory", "description": "Save something important to your persistent memory. Use this to remember facts about users, lessons learned, project context, or anything you want to recall in future conversations. Memories survive restarts.", "parameters": { "type": "object", "properties": { "topic": { "type": "string", "description": "Short topic name for the memory file (e.g. 'user_prefs', 'project_x', 'lessons')", }, "content": { "type": "string", "description": "The memory content to save", }, }, "required": ["topic", "content"], }, }, }, { "type": "function", "function": { "name": "web_search", "description": "Search the web using SearXNG. Returns titles, URLs, and snippets for the top results. Use this when you need current information or facts you're unsure about.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The search query", }, "num_results": { "type": "integer", "description": "Number of results to return (default 5)", }, }, "required": ["query"], }, }, }, ] SEARX_URL = CONFIG.get("searx_url", "https://searx.mymx.me") def log(msg): print(f"[agent:{NICK}] {msg}", flush=True) class IRCClient: def __init__(self, server, port, nick): self.server = server self.port = port self.nick = nick self.sock = None self.buf = "" self._lock = threading.Lock() def connect(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(300) self.sock.connect((self.server, self.port)) self.send(f"NICK {self.nick}") self.send(f"USER {self.nick} 0 * :Fireclaw Agent") def send(self, msg): with self._lock: self.sock.sendall(f"{msg}\r\n".encode("utf-8")) def join(self, channel): self.send(f"JOIN {channel}") def say(self, target, text): for line in text.split("\n"): line = line.strip() if line: while len(line) > 400: self.send(f"PRIVMSG {target} :{line[:400]}") line = line[400:] self.send(f"PRIVMSG {target} :{line}") def set_bot_mode(self): self.send(f"MODE {self.nick} +B") def recv_lines(self): try: data = self.sock.recv(4096) except socket.timeout: return [] if not data: raise ConnectionError("Connection closed") self.buf += data.decode("utf-8", errors="replace") lines = self.buf.split("\r\n") self.buf = lines.pop() return lines def run_command(command): """Execute a shell command and return output.""" log(f"Running command: {command[:100]}") try: result = subprocess.run( ["bash", "-c", command], capture_output=True, text=True, timeout=120, ) output = result.stdout if result.stderr: output += f"\n[stderr] {result.stderr}" if result.returncode != 0: output += f"\n[exit code: {result.returncode}]" # Truncate very long output if len(output) > 2000: output = output[:2000] + "\n[output truncated]" return output.strip() or "[no output]" except subprocess.TimeoutExpired: return "[command timed out after 120s]" except Exception as e: return f"[error: {e}]" def save_memory(topic, content): """Save a memory to the persistent workspace.""" import os mem_dir = f"{WORKSPACE}/memory" os.makedirs(mem_dir, exist_ok=True) # Write the memory file filepath = f"{mem_dir}/{topic}.md" with open(filepath, "w") as f: f.write(content + "\n") # Update MEMORY.md index index_path = f"{WORKSPACE}/MEMORY.md" existing = "" try: with open(index_path) as f: existing = f.read() except FileNotFoundError: existing = "# Agent Memory\n" # Add or update entry entry = f"- [{topic}](memory/{topic}.md)" if topic not in existing: with open(index_path, "a") as f: f.write(f"\n{entry}") # Reload memory into global global AGENT_MEMORY with open(index_path) as f: AGENT_MEMORY = f.read().strip() log(f"Memory saved: {topic}") return f"Memory saved to {filepath}" def web_search(query, num_results=5): """Search the web via SearXNG.""" log(f"Web search: {query[:60]}") try: import urllib.parse params = urllib.parse.urlencode({"q": query, "format": "json"}) req = urllib.request.Request( f"{SEARX_URL}/search?{params}", headers={"User-Agent": "fireclaw-agent"}, ) with urllib.request.urlopen(req, timeout=15) as resp: data = json.loads(resp.read()) results = data.get("results", [])[:num_results] if not results: return "No results found." lines = [] for r in results: title = r.get("title", "") url = r.get("url", "") snippet = r.get("content", "")[:150] lines.append(f"- {title}\n {url}\n {snippet}") return "\n".join(lines) except Exception as e: return f"[search error: {e}]" def try_parse_tool_call(text): """Try to parse a text-based tool call from model output. Handles formats like: {"name": "run_command", "arguments": {"command": "uptime"}} {"name": "run_command", ...} Returns (name, args) tuple or None. """ import re # Strip tool_call tags if present text = re.sub(r"", "", text).strip() # Try to find JSON in the text for start in range(len(text)): if text[start] == "{": for end in range(len(text), start, -1): if text[end - 1] == "}": try: obj = json.loads(text[start:end]) name = obj.get("name") args = obj.get("arguments", {}) if name and isinstance(args, dict): return (name, args) except json.JSONDecodeError: continue return None def ollama_request(payload): """Make a request to Ollama API.""" data = json.dumps(payload).encode("utf-8") req = urllib.request.Request( f"{OLLAMA_URL}/api/chat", data=data, headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read()) def query_ollama(messages): """Call Ollama chat API with tool support. Returns final response text.""" payload = { "model": RUNTIME["model"], "messages": messages, "stream": False, "options": {"num_predict": 512}, } if TOOLS_ENABLED: payload["tools"] = TOOLS for round_num in range(MAX_TOOL_ROUNDS): try: data = ollama_request(payload) except (urllib.error.URLError, TimeoutError) as e: return f"[error: {e}]" msg = data.get("message", {}) # Check for structured tool calls from API tool_calls = msg.get("tool_calls") if tool_calls: messages.append(msg) for tc in tool_calls: fn = tc.get("function", {}) fn_name = fn.get("name", "") fn_args = fn.get("arguments", {}) if fn_name == "run_command": cmd = fn_args.get("command", "") log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}") result = run_command(cmd) messages.append({"role": "tool", "content": result}) elif fn_name == "save_memory": topic = fn_args.get("topic", "note") content = fn_args.get("content", "") log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})") result = save_memory(topic, content) messages.append({"role": "tool", "content": result}) elif fn_name == "web_search": query = fn_args.get("query", "") num = fn_args.get("num_results", 5) log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})") result = web_search(query, num) messages.append({"role": "tool", "content": result}) else: messages.append({ "role": "tool", "content": f"[unknown tool: {fn_name}]", }) payload["messages"] = messages continue # Check for text-based tool calls (model dumped JSON as text) content = msg.get("content", "").strip() parsed_tool = try_parse_tool_call(content) if parsed_tool: fn_name, fn_args = parsed_tool messages.append({"role": "assistant", "content": content}) if fn_name == "run_command": cmd = fn_args.get("command", "") log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}") result = run_command(cmd) messages.append({"role": "user", "content": f"Command output:\n{result}\n\nNow provide your response to the user based on this output."}) elif fn_name == "save_memory": topic = fn_args.get("topic", "note") mem_content = fn_args.get("content", "") log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})") result = save_memory(topic, mem_content) messages.append({"role": "user", "content": f"{result}\n\nNow respond to the user."}) elif fn_name == "web_search": query = fn_args.get("query", "") num = fn_args.get("num_results", 5) log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})") result = web_search(query, num) messages.append({"role": "user", "content": f"Search results:\n{result}\n\nNow respond to the user based on these results."}) payload["messages"] = messages continue # No tool calls — return the text response return content return "[max tool rounds reached]" def build_messages(question, channel): """Build chat messages with system prompt and conversation history.""" system = RUNTIME["persona"] if TOOLS_ENABLED: system += "\n\nYou have access to tools:" system += "\n- run_command: Execute shell commands on your system." system += "\n- web_search: Search the web for current information." system += "\n- save_memory: Save important information to your persistent workspace." system += "\nUse tools when needed rather than guessing. Your workspace at /workspace persists across restarts." if AGENT_MEMORY and AGENT_MEMORY != "# Agent Memory": system += f"\n\nIMPORTANT - Your persistent memory (facts you saved previously, use these to answer questions):\n{AGENT_MEMORY}" system += f"\n\nYou are in IRC channel {channel}. Your nick is {NICK}. Keep responses concise — this is IRC." system += "\nWhen you want to address another agent or user, always start your message with their nick followed by a colon, e.g. 'coder: can you review this?'. This is how IRC mentions work — without the prefix, they won't see your message." messages = [{"role": "system", "content": system}] # Build conversation history as alternating user/assistant messages channel_msgs = [m for m in recent if m["channel"] == channel] for msg in channel_msgs[-CONTEXT_SIZE:]: if msg["nick"] == NICK: messages.append({"role": "assistant", "content": msg["text"]}) else: messages.append({"role": "user", "content": f"<{msg['nick']}> {msg['text']}"}) # Ensure the last message is from the user (the triggering question) # If the deque already captured it, don't double-add last = messages[-1] if len(messages) > 1 else None if not last or last.get("role") != "user" or question not in last.get("content", ""): messages.append({"role": "user", "content": question}) return messages def should_trigger(text): """Check if this message should trigger a response. Only triggers when nick is at the start of the message (e.g. 'worker: hello') not when nick appears elsewhere (e.g. 'coder: say hi to worker').""" if RUNTIME["trigger"] == "all": return True lower = text.lower() nick = NICK.lower() # Match: "nick: ...", "nick, ...", "nick ...", "@nick ..." return ( lower.startswith(f"{nick}:") or lower.startswith(f"{nick},") or lower.startswith(f"{nick} ") or lower.startswith(f"@{nick}") or lower == nick or text.startswith("!ask ") ) def extract_question(text): """Extract the actual question from the trigger.""" lower = text.lower() for prefix in [ f"{NICK.lower()}: ", f"{NICK.lower()}, ", f"@{NICK.lower()} ", f"{NICK.lower()} ", ]: if lower.startswith(prefix): return text[len(prefix):] if text.startswith("!ask "): return text[5:] return text # Track last response time to prevent agent-to-agent loops _last_response_time = 0 _AGENT_COOLDOWN = 10 # seconds between responses to prevent loops def handle_message(irc, source_nick, target, text): """Process an incoming PRIVMSG.""" global _last_response_time is_dm = not target.startswith("#") channel = source_nick if is_dm else target reply_to = source_nick if is_dm else target recent.append({"nick": source_nick, "text": text, "channel": channel}) if source_nick == NICK: return # DMs always trigger, channel messages need mention if not is_dm and not should_trigger(text): return # Cooldown to prevent agent-to-agent loops now = time.time() if now - _last_response_time < _AGENT_COOLDOWN: log(f"Cooldown active, ignoring trigger from {source_nick}") return _last_response_time = now question = extract_question(text) if not is_dm else text log(f"Triggered by {source_nick} in {channel}: {question[:80]}") def do_respond(): try: messages = build_messages(question, channel) response = query_ollama(messages) if not response: return lines = response.split("\n") if len(lines) > MAX_RESPONSE_LINES: lines = lines[:MAX_RESPONSE_LINES] lines.append(f"[truncated, {MAX_RESPONSE_LINES} lines max]") irc.say(reply_to, "\n".join(lines)) recent.append({"nick": NICK, "text": response[:200], "channel": channel}) except Exception as e: log(f"Error handling message: {e}") try: irc.say(reply_to, f"[error: {e}]") except Exception: pass threading.Thread(target=do_respond, daemon=True).start() def run(): log(f"Starting agent: nick={NICK} channel={CHANNEL} model={RUNTIME['model']} tools={TOOLS_ENABLED}") while True: try: irc = IRCClient(SERVER, PORT, NICK) log(f"Connecting to {SERVER}:{PORT}...") irc.connect() # Hot-reload on SIGHUP — re-read config and persona def handle_sighup(signum, frame): log("SIGHUP received, reloading config...") try: with open("/etc/agent/config.json") as f: new_config = json.load(f) RUNTIME["model"] = new_config.get("model", RUNTIME["model"]) RUNTIME["trigger"] = new_config.get("trigger", RUNTIME["trigger"]) try: with open("/etc/agent/persona.md") as f: RUNTIME["persona"] = f.read().strip() except FileNotFoundError: pass log(f"Reloaded: model={RUNTIME['model']} trigger={RUNTIME['trigger']}") irc.say(CHANNEL, f"[reloaded: model={RUNTIME['model']}]") except Exception as e: log(f"Reload failed: {e}") signal.signal(signal.SIGHUP, handle_sighup) # Graceful shutdown on SIGTERM — send IRC QUIT def handle_sigterm(signum, frame): log("SIGTERM received, quitting IRC...") try: irc.send("QUIT :Agent shutting down") except Exception: pass time.sleep(0.5) sys.exit(0) signal.signal(signal.SIGTERM, handle_sigterm) registered = False while True: lines = irc.recv_lines() for line in lines: if line.startswith("PING"): irc.send(f"PONG {line.split(' ', 1)[1]}") continue parts = line.split(" ") if len(parts) < 2: continue if parts[1] == "001" and not registered: registered = True log("Registered with server") irc.set_bot_mode() irc.join("#agents") log(f"Joined #agents") # Handle INVITE — auto-join invited channels if parts[1] == "INVITE" and len(parts) >= 3: invited_channel = parts[-1].lstrip(":") inviter = parts[0].split("!")[0].lstrip(":") log(f"Invited to {invited_channel} by {inviter}, joining...") irc.join(invited_channel) if parts[1] == "PRIVMSG" and len(parts) >= 4: source_nick = parts[0].split("!")[0].lstrip(":") target = parts[2] text = " ".join(parts[3:]).lstrip(":") handle_message(irc, source_nick, target, text) except (ConnectionError, OSError, socket.timeout) as e: log(f"Disconnected: {e}. Reconnecting in 5s...") time.sleep(5) except KeyboardInterrupt: log("Shutting down.") sys.exit(0) if __name__ == "__main__": run()