Files
fireclaw/agent/agent.py
2026-04-07 17:43:39 +00:00

682 lines
24 KiB
Python

#!/usr/bin/env python3
"""Fireclaw IRC agent — connects to IRC, responds via Ollama with tool access."""
import socket
import json
import sys
import time
import subprocess
import urllib.request
import urllib.error
import signal
import threading
from collections import deque
# Load config
with open("/etc/agent/config.json") as f:
CONFIG = json.load(f)
PERSONA = ""
try:
with open("/etc/agent/persona.md") as f:
PERSONA = f.read().strip()
except FileNotFoundError:
PERSONA = "You are a helpful assistant."
NICK = CONFIG.get("nick", "agent")
CHANNEL = CONFIG.get("channel", "#agents")
SERVER = CONFIG.get("server", "172.16.0.1")
PORT = CONFIG.get("port", 6667)
OLLAMA_URL = CONFIG.get("ollama_url", "http://172.16.0.1:11434")
CONTEXT_SIZE = CONFIG.get("context_size", 20)
MAX_RESPONSE_LINES = CONFIG.get("max_response_lines", 50)
TOOLS_ENABLED = CONFIG.get("tools", True)
MAX_TOOL_ROUNDS = CONFIG.get("max_tool_rounds", 5)
WORKSPACE = "/workspace"
# Mutable runtime config — can be hot-reloaded via SIGHUP
RUNTIME = {
"model": CONFIG.get("model", "qwen2.5-coder:7b"),
"trigger": CONFIG.get("trigger", "mention"),
"persona": PERSONA,
}
# Recent messages for context
recent = deque(maxlen=CONTEXT_SIZE)
# Load persistent memory from workspace
AGENT_MEMORY = ""
try:
import os
with open(f"{WORKSPACE}/MEMORY.md") as f:
AGENT_MEMORY = f.read().strip()
# Also load all memory files referenced in the index
mem_dir = f"{WORKSPACE}/memory"
if os.path.isdir(mem_dir):
for fname in sorted(os.listdir(mem_dir)):
if fname.endswith(".md"):
try:
with open(f"{mem_dir}/{fname}") as f:
topic = fname.replace(".md", "")
AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}"
except Exception:
pass
except FileNotFoundError:
pass
# Tool definitions for Ollama chat API
TOOLS = [
{
"type": "function",
"function": {
"name": "run_command",
"description": "Execute a shell command on this system and return the output. Use this to check system info, run scripts, fetch URLs, process data, etc.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute (bash)",
}
},
"required": ["command"],
},
},
},
{
"type": "function",
"function": {
"name": "save_memory",
"description": "Save something important to your persistent memory. Use this to remember facts about users, lessons learned, project context, or anything you want to recall in future conversations. Memories survive restarts.",
"parameters": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Short topic name for the memory file (e.g. 'user_prefs', 'project_x', 'lessons')",
},
"content": {
"type": "string",
"description": "The memory content to save",
},
},
"required": ["topic", "content"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web using SearXNG. Returns titles, URLs, and snippets for the top results. Use this when you need current information or facts you're unsure about.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default 5)",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "fetch_url",
"description": "Fetch a URL and return its text content. HTML is stripped to plain text. Use this to read web pages, documentation, articles, etc.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch",
},
},
"required": ["url"],
},
},
},
]
SEARX_URL = CONFIG.get("searx_url", "https://searx.mymx.me")
def log(msg):
print(f"[agent:{NICK}] {msg}", flush=True)
class IRCClient:
def __init__(self, server, port, nick):
self.server = server
self.port = port
self.nick = nick
self.sock = None
self.buf = ""
self._lock = threading.Lock()
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(300)
self.sock.connect((self.server, self.port))
self.send(f"NICK {self.nick}")
self.send(f"USER {self.nick} 0 * :Fireclaw Agent")
def send(self, msg):
with self._lock:
self.sock.sendall(f"{msg}\r\n".encode("utf-8"))
def join(self, channel):
self.send(f"JOIN {channel}")
def say(self, target, text):
for line in text.split("\n"):
line = line.strip()
if line:
while len(line) > 400:
self.send(f"PRIVMSG {target} :{line[:400]}")
line = line[400:]
self.send(f"PRIVMSG {target} :{line}")
def set_bot_mode(self):
self.send(f"MODE {self.nick} +B")
def recv_lines(self):
try:
data = self.sock.recv(4096)
except socket.timeout:
return []
if not data:
raise ConnectionError("Connection closed")
self.buf += data.decode("utf-8", errors="replace")
lines = self.buf.split("\r\n")
self.buf = lines.pop()
return lines
def run_command(command):
"""Execute a shell command and return output."""
log(f"Running command: {command[:100]}")
try:
result = subprocess.run(
["bash", "-c", command],
capture_output=True,
text=True,
timeout=120,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr] {result.stderr}"
if result.returncode != 0:
output += f"\n[exit code: {result.returncode}]"
# Truncate very long output
if len(output) > 2000:
output = output[:2000] + "\n[output truncated]"
return output.strip() or "[no output]"
except subprocess.TimeoutExpired:
return "[command timed out after 120s]"
except Exception as e:
return f"[error: {e}]"
def save_memory(topic, content):
"""Save a memory to the persistent workspace."""
import os
mem_dir = f"{WORKSPACE}/memory"
os.makedirs(mem_dir, exist_ok=True)
# Write the memory file
filepath = f"{mem_dir}/{topic}.md"
with open(filepath, "w") as f:
f.write(content + "\n")
# Update MEMORY.md index
index_path = f"{WORKSPACE}/MEMORY.md"
existing = ""
try:
with open(index_path) as f:
existing = f.read()
except FileNotFoundError:
existing = "# Agent Memory\n"
# Add or update entry
entry = f"- [{topic}](memory/{topic}.md)"
if topic not in existing:
with open(index_path, "a") as f:
f.write(f"\n{entry}")
# Reload memory into global
global AGENT_MEMORY
with open(index_path) as f:
AGENT_MEMORY = f.read().strip()
log(f"Memory saved: {topic}")
return f"Memory saved to {filepath}"
def web_search(query, num_results=5):
"""Search the web via SearXNG."""
log(f"Web search: {query[:60]}")
try:
import urllib.parse
params = urllib.parse.urlencode({"q": query, "format": "json"})
req = urllib.request.Request(
f"{SEARX_URL}/search?{params}",
headers={"User-Agent": "fireclaw-agent"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
results = data.get("results", [])[:num_results]
if not results:
return "No results found."
lines = []
for r in results:
title = r.get("title", "")
url = r.get("url", "")
snippet = r.get("content", "")[:150]
lines.append(f"- {title}\n {url}\n {snippet}")
return "\n".join(lines)
except Exception as e:
return f"[search error: {e}]"
def fetch_url(url):
"""Fetch a URL and return stripped text content."""
log(f"Fetching: {url[:80]}")
try:
from html.parser import HTMLParser
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ("script", "style", "noscript"):
self._skip = True
def handle_endtag(self, tag):
if tag in ("script", "style", "noscript"):
self._skip = False
if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "li", "tr"):
self.text.append("\n")
def handle_data(self, data):
if not self._skip:
self.text.append(data)
req = urllib.request.Request(url, headers={"User-Agent": "fireclaw-agent"})
with urllib.request.urlopen(req, timeout=15) as resp:
content_type = resp.headers.get("Content-Type", "")
raw = resp.read(50_000).decode("utf-8", errors="replace")
if "html" in content_type:
parser = TextExtractor()
parser.feed(raw)
text = "".join(parser.text)
else:
text = raw
# Clean up whitespace
import re
text = re.sub(r"\n{3,}", "\n\n", text).strip()
if len(text) > 3000:
text = text[:3000] + "\n[truncated]"
return text or "[empty page]"
except Exception as e:
return f"[fetch error: {e}]"
def try_parse_tool_call(text):
"""Try to parse a text-based tool call from model output.
Handles formats like:
{"name": "run_command", "arguments": {"command": "uptime"}}
<tool_call>{"name": "run_command", ...}</tool_call>
Returns (name, args) tuple or None.
"""
import re
# Strip tool_call tags if present
text = re.sub(r"</?tool_call>", "", text).strip()
# Try to find JSON in the text
for start in range(len(text)):
if text[start] == "{":
for end in range(len(text), start, -1):
if text[end - 1] == "}":
try:
obj = json.loads(text[start:end])
name = obj.get("name")
args = obj.get("arguments", {})
if name and isinstance(args, dict):
return (name, args)
except json.JSONDecodeError:
continue
return None
def ollama_request(payload):
"""Make a request to Ollama API."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/chat",
data=data,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read())
def query_ollama(messages):
"""Call Ollama chat API with tool support. Returns final response text."""
payload = {
"model": RUNTIME["model"],
"messages": messages,
"stream": False,
"options": {"num_predict": 512},
}
if TOOLS_ENABLED:
payload["tools"] = TOOLS
for round_num in range(MAX_TOOL_ROUNDS):
try:
data = ollama_request(payload)
except (urllib.error.URLError, TimeoutError) as e:
return f"[error: {e}]"
msg = data.get("message", {})
# Check for structured tool calls from API
tool_calls = msg.get("tool_calls")
if tool_calls:
messages.append(msg)
for tc in tool_calls:
fn = tc.get("function", {})
fn_name = fn.get("name", "")
fn_args = fn.get("arguments", {})
if fn_name == "run_command":
cmd = fn_args.get("command", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}")
result = run_command(cmd)
messages.append({"role": "tool", "content": result})
elif fn_name == "save_memory":
topic = fn_args.get("topic", "note")
content = fn_args.get("content", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})")
result = save_memory(topic, content)
messages.append({"role": "tool", "content": result})
elif fn_name == "web_search":
query = fn_args.get("query", "")
num = fn_args.get("num_results", 5)
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})")
result = web_search(query, num)
messages.append({"role": "tool", "content": result})
elif fn_name == "fetch_url":
url = fn_args.get("url", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: fetch_url({url[:60]})")
result = fetch_url(url)
messages.append({"role": "tool", "content": result})
else:
messages.append({
"role": "tool",
"content": f"[unknown tool: {fn_name}]",
})
payload["messages"] = messages
continue
# Check for text-based tool calls (model dumped JSON as text)
content = msg.get("content", "").strip()
parsed_tool = try_parse_tool_call(content)
if parsed_tool:
fn_name, fn_args = parsed_tool
messages.append({"role": "assistant", "content": content})
if fn_name == "run_command":
cmd = fn_args.get("command", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}")
result = run_command(cmd)
messages.append({"role": "user", "content": f"Command output:\n{result}\n\nNow provide your response to the user based on this output."})
elif fn_name == "save_memory":
topic = fn_args.get("topic", "note")
mem_content = fn_args.get("content", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})")
result = save_memory(topic, mem_content)
messages.append({"role": "user", "content": f"{result}\n\nNow respond to the user."})
elif fn_name == "web_search":
query = fn_args.get("query", "")
num = fn_args.get("num_results", 5)
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})")
result = web_search(query, num)
messages.append({"role": "user", "content": f"Search results:\n{result}\n\nNow respond to the user based on these results."})
elif fn_name == "fetch_url":
url = fn_args.get("url", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: fetch_url({url[:60]})")
result = fetch_url(url)
messages.append({"role": "user", "content": f"Page content:\n{result}\n\nNow respond to the user based on this content."})
payload["messages"] = messages
continue
# No tool calls — return the text response
return content
return "[max tool rounds reached]"
def build_messages(question, channel):
"""Build chat messages with system prompt and conversation history."""
system = RUNTIME["persona"]
if TOOLS_ENABLED:
system += "\n\nYou have access to tools:"
system += "\n- run_command: Execute shell commands on your system."
system += "\n- web_search: Search the web for current information."
system += "\n- fetch_url: Fetch and read a web page's content."
system += "\n- save_memory: Save important information to your persistent workspace."
system += "\nUse tools when needed rather than guessing. Your workspace at /workspace persists across restarts."
if AGENT_MEMORY and AGENT_MEMORY != "# Agent Memory":
system += f"\n\nIMPORTANT - Your persistent memory (facts you saved previously, use these to answer questions):\n{AGENT_MEMORY}"
system += f"\n\nYou are in IRC channel {channel}. Your nick is {NICK}. Keep responses concise — this is IRC."
system += "\nWhen you want to address another agent or user, always start your message with their nick followed by a colon, e.g. 'coder: can you review this?'. This is how IRC mentions work — without the prefix, they won't see your message."
messages = [{"role": "system", "content": system}]
# Build conversation history as alternating user/assistant messages
channel_msgs = [m for m in recent if m["channel"] == channel]
for msg in channel_msgs[-CONTEXT_SIZE:]:
if msg["nick"] == NICK:
messages.append({"role": "assistant", "content": msg["text"]})
else:
messages.append({"role": "user", "content": f"<{msg['nick']}> {msg['text']}"})
# Ensure the last message is from the user (the triggering question)
# If the deque already captured it, don't double-add
last = messages[-1] if len(messages) > 1 else None
if not last or last.get("role") != "user" or question not in last.get("content", ""):
messages.append({"role": "user", "content": question})
return messages
def should_trigger(text):
"""Check if this message should trigger a response.
Only triggers when nick is at the start of the message (e.g. 'worker: hello')
not when nick appears elsewhere (e.g. 'coder: say hi to worker')."""
if RUNTIME["trigger"] == "all":
return True
lower = text.lower()
nick = NICK.lower()
# Match: "nick: ...", "nick, ...", "nick ...", "@nick ..."
return (
lower.startswith(f"{nick}:") or
lower.startswith(f"{nick},") or
lower.startswith(f"{nick} ") or
lower.startswith(f"@{nick}") or
lower == nick or
text.startswith("!ask ")
)
def extract_question(text):
"""Extract the actual question from the trigger."""
lower = text.lower()
for prefix in [
f"{NICK.lower()}: ",
f"{NICK.lower()}, ",
f"@{NICK.lower()} ",
f"{NICK.lower()} ",
]:
if lower.startswith(prefix):
return text[len(prefix):]
if text.startswith("!ask "):
return text[5:]
return text
# Track last response time to prevent agent-to-agent loops
_last_response_time = 0
_AGENT_COOLDOWN = 10 # seconds between responses to prevent loops
def handle_message(irc, source_nick, target, text):
"""Process an incoming PRIVMSG."""
global _last_response_time
is_dm = not target.startswith("#")
channel = source_nick if is_dm else target
reply_to = source_nick if is_dm else target
recent.append({"nick": source_nick, "text": text, "channel": channel})
if source_nick == NICK:
return
# DMs always trigger, channel messages need mention
if not is_dm and not should_trigger(text):
return
# Cooldown to prevent agent-to-agent loops
now = time.time()
if now - _last_response_time < _AGENT_COOLDOWN:
log(f"Cooldown active, ignoring trigger from {source_nick}")
return
_last_response_time = now
question = extract_question(text) if not is_dm else text
log(f"Triggered by {source_nick} in {channel}: {question[:80]}")
def do_respond():
try:
messages = build_messages(question, channel)
response = query_ollama(messages)
if not response:
return
lines = response.split("\n")
if len(lines) > MAX_RESPONSE_LINES:
lines = lines[:MAX_RESPONSE_LINES]
lines.append(f"[truncated, {MAX_RESPONSE_LINES} lines max]")
irc.say(reply_to, "\n".join(lines))
recent.append({"nick": NICK, "text": response[:200], "channel": channel})
except Exception as e:
log(f"Error handling message: {e}")
try:
irc.say(reply_to, f"[error: {e}]")
except Exception:
pass
threading.Thread(target=do_respond, daemon=True).start()
def run():
log(f"Starting agent: nick={NICK} channel={CHANNEL} model={RUNTIME['model']} tools={TOOLS_ENABLED}")
while True:
try:
irc = IRCClient(SERVER, PORT, NICK)
log(f"Connecting to {SERVER}:{PORT}...")
irc.connect()
# Hot-reload on SIGHUP — re-read config and persona
def handle_sighup(signum, frame):
log("SIGHUP received, reloading config...")
try:
with open("/etc/agent/config.json") as f:
new_config = json.load(f)
RUNTIME["model"] = new_config.get("model", RUNTIME["model"])
RUNTIME["trigger"] = new_config.get("trigger", RUNTIME["trigger"])
try:
with open("/etc/agent/persona.md") as f:
RUNTIME["persona"] = f.read().strip()
except FileNotFoundError:
pass
log(f"Reloaded: model={RUNTIME['model']} trigger={RUNTIME['trigger']}")
irc.say(CHANNEL, f"[reloaded: model={RUNTIME['model']}]")
except Exception as e:
log(f"Reload failed: {e}")
signal.signal(signal.SIGHUP, handle_sighup)
# Graceful shutdown on SIGTERM — send IRC QUIT
def handle_sigterm(signum, frame):
log("SIGTERM received, quitting IRC...")
try:
irc.send("QUIT :Agent shutting down")
except Exception:
pass
time.sleep(0.5)
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
registered = False
while True:
lines = irc.recv_lines()
for line in lines:
if line.startswith("PING"):
irc.send(f"PONG {line.split(' ', 1)[1]}")
continue
parts = line.split(" ")
if len(parts) < 2:
continue
if parts[1] == "001" and not registered:
registered = True
log("Registered with server")
irc.set_bot_mode()
irc.join("#agents")
log(f"Joined #agents")
# Handle INVITE — auto-join invited channels
if parts[1] == "INVITE" and len(parts) >= 3:
invited_channel = parts[-1].lstrip(":")
inviter = parts[0].split("!")[0].lstrip(":")
log(f"Invited to {invited_channel} by {inviter}, joining...")
irc.join(invited_channel)
if parts[1] == "PRIVMSG" and len(parts) >= 4:
source_nick = parts[0].split("!")[0].lstrip(":")
target = parts[2]
text = " ".join(parts[3:]).lstrip(":")
handle_message(irc, source_nick, target, text)
except (ConnectionError, OSError, socket.timeout) as e:
log(f"Disconnected: {e}. Reconnecting in 5s...")
time.sleep(5)
except KeyboardInterrupt:
log("Shutting down.")
sys.exit(0)
if __name__ == "__main__":
run()