Refactor agent.py to use discoverable skill system

This commit is contained in:
2026-04-07 20:35:56 +00:00
parent 4483b585a7
commit 2d42d498b3

View File

@@ -1,6 +1,8 @@
#!/usr/bin/env python3
"""Fireclaw IRC agent — connects to IRC, responds via Ollama with tool access."""
"""Fireclaw IRC agent — connects to IRC, responds via Ollama with discoverable skills."""
import os
import re
import socket
import json
import sys
@@ -24,7 +26,6 @@ except FileNotFoundError:
PERSONA = "You are a helpful assistant."
NICK = CONFIG.get("nick", "agent")
CHANNEL = CONFIG.get("channel", "#agents")
SERVER = CONFIG.get("server", "172.16.0.1")
PORT = CONFIG.get("port", 6667)
OLLAMA_URL = CONFIG.get("ollama_url", "http://172.16.0.1:11434")
@@ -33,24 +34,22 @@ MAX_RESPONSE_LINES = CONFIG.get("max_response_lines", 50)
TOOLS_ENABLED = CONFIG.get("tools", True)
MAX_TOOL_ROUNDS = CONFIG.get("max_tool_rounds", 5)
WORKSPACE = "/workspace"
SKILL_DIRS = ["/opt/skills", f"{WORKSPACE}/skills"]
# Mutable runtime config — can be hot-reloaded via SIGHUP
RUNTIME = {
"model": CONFIG.get("model", "qwen2.5-coder:7b"),
"trigger": CONFIG.get("trigger", "mention"),
"persona": PERSONA,
}
# Recent messages for context
recent = deque(maxlen=CONTEXT_SIZE)
# Load persistent memory from workspace
# ─── Memory ──────────────────────────────────────────────────────────
AGENT_MEMORY = ""
try:
import os
with open(f"{WORKSPACE}/MEMORY.md") as f:
AGENT_MEMORY = f.read().strip()
# Also load all memory files referenced in the index
mem_dir = f"{WORKSPACE}/memory"
if os.path.isdir(mem_dir):
for fname in sorted(os.listdir(mem_dir)):
@@ -64,93 +63,151 @@ try:
except FileNotFoundError:
pass
# Tool definitions for Ollama chat API
TOOLS = [
{
"type": "function",
"function": {
"name": "run_command",
"description": "Execute a shell command on this system and return the output. Use this to check system info, run scripts, fetch URLs, process data, etc.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute (bash)",
}
},
"required": ["command"],
},
},
},
{
"type": "function",
"function": {
"name": "save_memory",
"description": "Save something important to your persistent memory. Use this to remember facts about users, lessons learned, project context, or anything you want to recall in future conversations. Memories survive restarts.",
"parameters": {
"type": "object",
"properties": {
"topic": {
"type": "string",
"description": "Short topic name for the memory file (e.g. 'user_prefs', 'project_x', 'lessons')",
},
"content": {
"type": "string",
"description": "The memory content to save",
},
},
"required": ["topic", "content"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web using SearXNG. Returns titles, URLs, and snippets for the top results. Use this when you need current information or facts you're unsure about.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
},
"num_results": {
"type": "integer",
"description": "Number of results to return (default 5)",
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "fetch_url",
"description": "Fetch a URL and return its text content. HTML is stripped to plain text. Use this to read web pages, documentation, articles, etc.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch",
},
},
"required": ["url"],
},
},
},
]
# ─── Skill System ────────────────────────────────────────────────────
SEARX_URL = CONFIG.get("searx_url", "https://searx.mymx.me")
def parse_skill_md(path):
"""Parse a SKILL.md frontmatter into a tool definition."""
with open(path) as f:
content = f.read()
# Extract YAML frontmatter between ---
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
if not match:
return None
# Simple YAML-like parser (no pyyaml dependency)
fm = {}
current_key = None
current_param = None
params = {}
for line in match.group(1).split("\n"):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if line.startswith(" ") and current_key == "parameters":
# Inside parameters block
if line.startswith(" ") and current_param:
# Parameter property
k, _, v = stripped.partition(":")
v = v.strip().strip('"').strip("'")
if k.strip() == "required":
v = v.lower() == "true"
params[current_param][k.strip()] = v
elif ":" in stripped:
# New parameter
param_name = stripped.rstrip(":").strip()
current_param = param_name
params[param_name] = {}
elif ":" in line and not line.startswith(" "):
k, _, v = line.partition(":")
k = k.strip()
v = v.strip().strip('"').strip("'")
fm[k] = v
current_key = k
if k == "parameters":
current_param = None
if "name" not in fm:
return None
# Build Ollama tool definition
properties = {}
required = []
for pname, pdata in params.items():
properties[pname] = {
"type": pdata.get("type", "string"),
"description": pdata.get("description", ""),
}
if pdata.get("required", False):
required.append(pname)
return {
"type": "function",
"function": {
"name": fm["name"],
"description": fm.get("description", ""),
"parameters": {
"type": "object",
"properties": properties,
"required": required,
},
},
}
def discover_skills():
"""Scan skill directories and return tool definitions + script paths."""
tools = []
scripts = {}
for skill_dir in SKILL_DIRS:
if not os.path.isdir(skill_dir):
continue
for name in sorted(os.listdir(skill_dir)):
skill_path = os.path.join(skill_dir, name)
skill_md = os.path.join(skill_path, "SKILL.md")
if not os.path.isfile(skill_md):
continue
tool_def = parse_skill_md(skill_md)
if not tool_def:
continue
# Find the run script
for ext in ("run.py", "run.sh"):
script = os.path.join(skill_path, ext)
if os.path.isfile(script):
scripts[tool_def["function"]["name"]] = script
break
if tool_def["function"]["name"] in scripts:
tools.append(tool_def)
return tools, scripts
def execute_skill(script_path, args):
"""Execute a skill script with args as JSON on stdin."""
# Pass config extras via env
env = os.environ.copy()
env["WORKSPACE"] = WORKSPACE
env["SEARX_URL"] = CONFIG.get("searx_url", "https://searx.mymx.me")
try:
result = subprocess.run(
["python3" if script_path.endswith(".py") else "bash", script_path],
input=json.dumps(args),
capture_output=True,
text=True,
timeout=120,
env=env,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr] {result.stderr}"
return output.strip() or "[no output]"
except subprocess.TimeoutExpired:
return "[skill timed out after 120s]"
except Exception as e:
return f"[skill error: {e}]"
# Discover skills at startup
TOOLS, SKILL_SCRIPTS = discover_skills()
def log(msg):
print(f"[agent:{NICK}] {msg}", flush=True)
log(f"Loaded {len(TOOLS)} skills: {', '.join(SKILL_SCRIPTS.keys())}")
# ─── IRC Client ──────────────────────────────────────────────────────
class IRCClient:
def __init__(self, server, port, nick):
self.server = server
@@ -199,152 +256,12 @@ class IRCClient:
return lines
def run_command(command):
"""Execute a shell command and return output."""
log(f"Running command: {command[:100]}")
try:
result = subprocess.run(
["bash", "-c", command],
capture_output=True,
text=True,
timeout=120,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr] {result.stderr}"
if result.returncode != 0:
output += f"\n[exit code: {result.returncode}]"
# Truncate very long output
if len(output) > 2000:
output = output[:2000] + "\n[output truncated]"
return output.strip() or "[no output]"
except subprocess.TimeoutExpired:
return "[command timed out after 120s]"
except Exception as e:
return f"[error: {e}]"
def save_memory(topic, content):
"""Save a memory to the persistent workspace."""
import os
mem_dir = f"{WORKSPACE}/memory"
os.makedirs(mem_dir, exist_ok=True)
# Write the memory file
filepath = f"{mem_dir}/{topic}.md"
with open(filepath, "w") as f:
f.write(content + "\n")
# Update MEMORY.md index
index_path = f"{WORKSPACE}/MEMORY.md"
existing = ""
try:
with open(index_path) as f:
existing = f.read()
except FileNotFoundError:
existing = "# Agent Memory\n"
# Add or update entry
entry = f"- [{topic}](memory/{topic}.md)"
if topic not in existing:
with open(index_path, "a") as f:
f.write(f"\n{entry}")
# Reload memory into global
global AGENT_MEMORY
with open(index_path) as f:
AGENT_MEMORY = f.read().strip()
log(f"Memory saved: {topic}")
return f"Memory saved to {filepath}"
def web_search(query, num_results=5):
"""Search the web via SearXNG."""
log(f"Web search: {query[:60]}")
try:
import urllib.parse
params = urllib.parse.urlencode({"q": query, "format": "json"})
req = urllib.request.Request(
f"{SEARX_URL}/search?{params}",
headers={"User-Agent": "fireclaw-agent"},
)
with urllib.request.urlopen(req, timeout=15) as resp:
data = json.loads(resp.read())
results = data.get("results", [])[:num_results]
if not results:
return "No results found."
lines = []
for r in results:
title = r.get("title", "")
url = r.get("url", "")
snippet = r.get("content", "")[:150]
lines.append(f"- {title}\n {url}\n {snippet}")
return "\n".join(lines)
except Exception as e:
return f"[search error: {e}]"
def fetch_url(url):
"""Fetch a URL and return stripped text content."""
log(f"Fetching: {url[:80]}")
try:
from html.parser import HTMLParser
class TextExtractor(HTMLParser):
def __init__(self):
super().__init__()
self.text = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ("script", "style", "noscript"):
self._skip = True
def handle_endtag(self, tag):
if tag in ("script", "style", "noscript"):
self._skip = False
if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "li", "tr"):
self.text.append("\n")
def handle_data(self, data):
if not self._skip:
self.text.append(data)
req = urllib.request.Request(url, headers={"User-Agent": "fireclaw-agent"})
with urllib.request.urlopen(req, timeout=15) as resp:
content_type = resp.headers.get("Content-Type", "")
raw = resp.read(50_000).decode("utf-8", errors="replace")
if "html" in content_type:
parser = TextExtractor()
parser.feed(raw)
text = "".join(parser.text)
else:
text = raw
# Clean up whitespace
import re
text = re.sub(r"\n{3,}", "\n\n", text).strip()
if len(text) > 3000:
text = text[:3000] + "\n[truncated]"
return text or "[empty page]"
except Exception as e:
return f"[fetch error: {e}]"
# ─── Tool Dispatch ───────────────────────────────────────────────────
def try_parse_tool_call(text):
"""Try to parse a text-based tool call from model output.
Handles formats like:
{"name": "run_command", "arguments": {"command": "uptime"}}
<tool_call>{"name": "run_command", ...}</tool_call>
Returns (name, args) tuple or None.
"""
import re
# Strip tool_call tags if present
"""Parse text-based tool calls (model dumps JSON as text)."""
text = re.sub(r"</?tool_call>", "", text).strip()
# Try to find JSON in the text
for start in range(len(text)):
if text[start] == "{":
for end in range(len(text), start, -1):
@@ -360,8 +277,29 @@ def try_parse_tool_call(text):
return None
def dispatch_tool(fn_name, fn_args, round_num):
"""Execute a tool call via the skill system."""
script = SKILL_SCRIPTS.get(fn_name)
if not script:
return f"[unknown tool: {fn_name}]"
log(f"Skill [{round_num}]: {fn_name}({str(fn_args)[:60]})")
# Handle save_memory reload
result = execute_skill(script, fn_args)
# Reload memory if save_memory was called
if fn_name == "save_memory":
global AGENT_MEMORY
try:
with open(f"{WORKSPACE}/MEMORY.md") as f:
AGENT_MEMORY = f.read().strip()
except FileNotFoundError:
pass
return result
def ollama_request(payload):
"""Make a request to Ollama API."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/chat",
@@ -373,7 +311,7 @@ def ollama_request(payload):
def query_ollama(messages):
"""Call Ollama chat API with tool support. Returns final response text."""
"""Call Ollama chat API with skill-based tool support."""
payload = {
"model": RUNTIME["model"],
"messages": messages,
@@ -381,7 +319,7 @@ def query_ollama(messages):
"options": {"num_predict": 512},
}
if TOOLS_ENABLED:
if TOOLS_ENABLED and TOOLS:
payload["tools"] = TOOLS
for round_num in range(MAX_TOOL_ROUNDS):
@@ -392,93 +330,49 @@ def query_ollama(messages):
msg = data.get("message", {})
# Check for structured tool calls from API
# Structured tool calls
tool_calls = msg.get("tool_calls")
if tool_calls:
messages.append(msg)
for tc in tool_calls:
fn = tc.get("function", {})
fn_name = fn.get("name", "")
fn_args = fn.get("arguments", {})
if fn_name == "run_command":
cmd = fn_args.get("command", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}")
result = run_command(cmd)
messages.append({"role": "tool", "content": result})
elif fn_name == "save_memory":
topic = fn_args.get("topic", "note")
content = fn_args.get("content", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})")
result = save_memory(topic, content)
messages.append({"role": "tool", "content": result})
elif fn_name == "web_search":
query = fn_args.get("query", "")
num = fn_args.get("num_results", 5)
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})")
result = web_search(query, num)
messages.append({"role": "tool", "content": result})
elif fn_name == "fetch_url":
url = fn_args.get("url", "")
log(f"Tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: fetch_url({url[:60]})")
result = fetch_url(url)
messages.append({"role": "tool", "content": result})
else:
messages.append({
"role": "tool",
"content": f"[unknown tool: {fn_name}]",
})
result = dispatch_tool(
fn.get("name", ""),
fn.get("arguments", {}),
round_num + 1,
)
messages.append({"role": "tool", "content": result})
payload["messages"] = messages
continue
# Check for text-based tool calls (model dumped JSON as text)
# Text-based tool calls
content = msg.get("content", "").strip()
parsed_tool = try_parse_tool_call(content)
if parsed_tool:
fn_name, fn_args = parsed_tool
messages.append({"role": "assistant", "content": content})
if fn_name == "run_command":
cmd = fn_args.get("command", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: {cmd[:80]}")
result = run_command(cmd)
messages.append({"role": "user", "content": f"Command output:\n{result}\n\nNow provide your response to the user based on this output."})
elif fn_name == "save_memory":
topic = fn_args.get("topic", "note")
mem_content = fn_args.get("content", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: save_memory({topic})")
result = save_memory(topic, mem_content)
messages.append({"role": "user", "content": f"{result}\n\nNow respond to the user."})
elif fn_name == "web_search":
query = fn_args.get("query", "")
num = fn_args.get("num_results", 5)
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: web_search({query[:60]})")
result = web_search(query, num)
messages.append({"role": "user", "content": f"Search results:\n{result}\n\nNow respond to the user based on these results."})
elif fn_name == "fetch_url":
url = fn_args.get("url", "")
log(f"Text tool call [{round_num+1}/{MAX_TOOL_ROUNDS}]: fetch_url({url[:60]})")
result = fetch_url(url)
messages.append({"role": "user", "content": f"Page content:\n{result}\n\nNow respond to the user based on this content."})
payload["messages"] = messages
continue
if fn_name in SKILL_SCRIPTS:
messages.append({"role": "assistant", "content": content})
result = dispatch_tool(fn_name, fn_args, round_num + 1)
messages.append({
"role": "user",
"content": f"Tool result:\n{result}\n\nNow respond to the user based on this result.",
})
payload["messages"] = messages
continue
# No tool calls — return the text response
return content
return "[max tool rounds reached]"
# ─── Message Handling ────────────────────────────────────────────────
def build_messages(question, channel):
"""Build chat messages with system prompt and conversation history."""
system = RUNTIME["persona"]
if TOOLS_ENABLED:
system += "\n\nYou have access to tools:"
system += "\n- run_command: Execute shell commands on your system."
system += "\n- web_search: Search the web for current information."
system += "\n- fetch_url: Fetch and read a web page's content."
system += "\n- save_memory: Save important information to your persistent workspace."
if TOOLS_ENABLED and TOOLS:
skill_names = [t["function"]["name"] for t in TOOLS]
system += "\n\nYou have access to tools: " + ", ".join(skill_names) + "."
system += "\nUse tools when needed rather than guessing. Your workspace at /workspace persists across restarts."
if AGENT_MEMORY and AGENT_MEMORY != "# Agent Memory":
system += f"\n\nIMPORTANT - Your persistent memory (facts you saved previously, use these to answer questions):\n{AGENT_MEMORY}"
@@ -487,7 +381,6 @@ def build_messages(question, channel):
messages = [{"role": "system", "content": system}]
# Build conversation history as alternating user/assistant messages
channel_msgs = [m for m in recent if m["channel"] == channel]
for msg in channel_msgs[-CONTEXT_SIZE:]:
if msg["nick"] == NICK:
@@ -495,8 +388,6 @@ def build_messages(question, channel):
else:
messages.append({"role": "user", "content": f"<{msg['nick']}> {msg['text']}"})
# Ensure the last message is from the user (the triggering question)
# If the deque already captured it, don't double-add
last = messages[-1] if len(messages) > 1 else None
if not last or last.get("role") != "user" or question not in last.get("content", ""):
messages.append({"role": "user", "content": question})
@@ -505,14 +396,10 @@ def build_messages(question, channel):
def should_trigger(text):
"""Check if this message should trigger a response.
Only triggers when nick is at the start of the message (e.g. 'worker: hello')
not when nick appears elsewhere (e.g. 'coder: say hi to worker')."""
if RUNTIME["trigger"] == "all":
return True
lower = text.lower()
nick = NICK.lower()
# Match: "nick: ...", "nick, ...", "nick ...", "@nick ..."
return (
lower.startswith(f"{nick}:") or
lower.startswith(f"{nick},") or
@@ -524,7 +411,6 @@ def should_trigger(text):
def extract_question(text):
"""Extract the actual question from the trigger."""
lower = text.lower()
for prefix in [
f"{NICK.lower()}: ",
@@ -539,13 +425,11 @@ def extract_question(text):
return text
# Track last response time to prevent agent-to-agent loops
_last_response_time = 0
_AGENT_COOLDOWN = 10 # seconds between responses to prevent loops
_AGENT_COOLDOWN = 10
def handle_message(irc, source_nick, target, text):
"""Process an incoming PRIVMSG."""
global _last_response_time
is_dm = not target.startswith("#")
@@ -557,11 +441,9 @@ def handle_message(irc, source_nick, target, text):
if source_nick == NICK:
return
# DMs always trigger, channel messages need mention
if not is_dm and not should_trigger(text):
return
# Cooldown to prevent agent-to-agent loops
now = time.time()
if now - _last_response_time < _AGENT_COOLDOWN:
log(f"Cooldown active, ignoring trigger from {source_nick}")
@@ -596,8 +478,11 @@ def handle_message(irc, source_nick, target, text):
threading.Thread(target=do_respond, daemon=True).start()
# ─── Main Loop ───────────────────────────────────────────────────────
def run():
log(f"Starting agent: nick={NICK} channel={CHANNEL} model={RUNTIME['model']} tools={TOOLS_ENABLED}")
log(f"Starting agent: nick={NICK} model={RUNTIME['model']} tools={TOOLS_ENABLED}")
while True:
try:
@@ -605,7 +490,6 @@ def run():
log(f"Connecting to {SERVER}:{PORT}...")
irc.connect()
# Hot-reload on SIGHUP — re-read config and persona
def handle_sighup(signum, frame):
log("SIGHUP received, reloading config...")
try:
@@ -619,13 +503,12 @@ def run():
except FileNotFoundError:
pass
log(f"Reloaded: model={RUNTIME['model']} trigger={RUNTIME['trigger']}")
irc.say(CHANNEL, f"[reloaded: model={RUNTIME['model']}]")
irc.say("#agents", f"[reloaded: model={RUNTIME['model']}]")
except Exception as e:
log(f"Reload failed: {e}")
signal.signal(signal.SIGHUP, handle_sighup)
# Graceful shutdown on SIGTERM — send IRC QUIT
def handle_sigterm(signum, frame):
log("SIGTERM received, quitting IRC...")
try:
@@ -654,9 +537,8 @@ def run():
log("Registered with server")
irc.set_bot_mode()
irc.join("#agents")
log(f"Joined #agents")
log("Joined #agents")
# Handle INVITE — auto-join invited channels
if parts[1] == "INVITE" and len(parts) >= 3:
invited_channel = parts[-1].lstrip(":")
inviter = parts[0].split("!")[0].lstrip(":")