Slim down agent.py — uses skills.py and tools.py modules

This commit is contained in:
2026-04-08 01:58:06 +00:00
parent 383113126f
commit ca224a0ae9

View File

@@ -2,19 +2,19 @@
"""Fireclaw IRC agent — connects to IRC, responds via Ollama with discoverable skills."""
import os
import re
import socket
import json
import sys
import time
import subprocess
import urllib.request
import urllib.error
import signal
import threading
from collections import deque
# Load config
from skills import discover_skills, execute_skill, set_logger as set_skills_logger
from tools import load_memory, query_ollama, set_logger as set_tools_logger
# ─── Config ──────────────────────────────────────────────────────────
with open("/etc/agent/config.json") as f:
CONFIG = json.load(f)
@@ -44,197 +44,7 @@ RUNTIME = {
recent = deque(maxlen=CONTEXT_SIZE)
# ─── Memory ──────────────────────────────────────────────────────────
AGENT_MEMORY = ""
try:
with open(f"{WORKSPACE}/MEMORY.md") as f:
AGENT_MEMORY = f.read().strip()
mem_dir = f"{WORKSPACE}/memory"
if os.path.isdir(mem_dir):
for fname in sorted(os.listdir(mem_dir)):
if fname.endswith(".md"):
try:
with open(f"{mem_dir}/{fname}") as f:
topic = fname.replace(".md", "")
AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}"
except Exception:
pass
except FileNotFoundError:
pass
# ─── Skill System ────────────────────────────────────────────────────
def parse_skill_md(path):
"""Parse a SKILL.md frontmatter into a tool definition.
Returns tool definition dict or None on failure."""
try:
with open(path) as f:
content = f.read()
except Exception as e:
log(f"Cannot read {path}: {e}")
return None
# Normalize line endings
content = content.replace("\r\n", "\n")
# Extract YAML frontmatter between ---
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
if not match:
log(f"No frontmatter in {path}")
return None
# Simple YAML-like parser (no pyyaml dependency)
fm = {}
current_key = None
current_param = None
params = {}
for line in match.group(1).split("\n"):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Detect indent level (flexible — 2+ spaces counts as nested)
indent = len(line) - len(line.lstrip())
if indent >= 2 and current_key == "parameters":
if indent >= 4 and current_param:
# Parameter property
k, _, v = stripped.partition(":")
k = k.strip()
v = v.strip().strip('"').strip("'")
if k == "required":
v = v.lower() in ("true", "yes", "1")
params[current_param][k] = v
elif ":" in stripped:
# New parameter name
param_name = stripped.rstrip(":").strip()
current_param = param_name
params[param_name] = {}
elif ":" in line and indent == 0:
k, _, v = line.partition(":")
k = k.strip()
v = v.strip().strip('"').strip("'")
fm[k] = v
current_key = k
if k == "parameters":
current_param = None
if "name" not in fm:
log(f"No 'name' field in {path}")
return None
if "description" not in fm:
log(f"Warning: no 'description' in {path}")
# Build Ollama tool definition
properties = {}
required = []
for pname, pdata in params.items():
ptype = pdata.get("type", "string")
if ptype not in ("string", "integer", "number", "boolean", "array", "object"):
log(f"Warning: unknown type '{ptype}' for param '{pname}' in {path}")
properties[pname] = {
"type": ptype,
"description": pdata.get("description", ""),
}
if pdata.get("required", False):
required.append(pname)
return {
"type": "function",
"function": {
"name": fm["name"],
"description": fm.get("description", ""),
"parameters": {
"type": "object",
"properties": properties,
"required": required,
},
},
}
def discover_skills():
"""Scan skill directories and return tool definitions + script paths."""
tools = []
scripts = {}
for skill_dir in SKILL_DIRS:
if not os.path.isdir(skill_dir):
continue
for name in sorted(os.listdir(skill_dir)):
skill_path = os.path.join(skill_dir, name)
skill_md = os.path.join(skill_path, "SKILL.md")
if not os.path.isfile(skill_md):
continue
tool_def = parse_skill_md(skill_md)
if not tool_def:
continue
# Find the run script
for ext in ("run.py", "run.sh"):
script = os.path.join(skill_path, ext)
if os.path.isfile(script):
scripts[tool_def["function"]["name"]] = script
break
if tool_def["function"]["name"] in scripts:
tools.append(tool_def)
return tools, scripts
LARGE_OUTPUT_THRESHOLD = 2000
LARGE_OUTPUT_DIR = f"{WORKSPACE}/tool_outputs"
_output_counter = 0
def execute_skill(script_path, args):
"""Execute a skill script with args as JSON on stdin.
Large outputs are saved to a file with a preview returned."""
global _output_counter
env = os.environ.copy()
env["WORKSPACE"] = WORKSPACE
env["SEARX_URL"] = CONFIG.get("searx_url", "https://searx.mymx.me")
try:
result = subprocess.run(
["python3" if script_path.endswith(".py") else "bash", script_path],
input=json.dumps(args),
capture_output=True,
text=True,
timeout=120,
env=env,
)
output = result.stdout
if result.stderr:
output += f"\n[stderr] {result.stderr}"
output = output.strip() or "[no output]"
# Large output handling — save to file, return preview
if len(output) > LARGE_OUTPUT_THRESHOLD:
os.makedirs(LARGE_OUTPUT_DIR, exist_ok=True)
_output_counter += 1
filepath = f"{LARGE_OUTPUT_DIR}/output_{_output_counter}.txt"
with open(filepath, "w") as f:
f.write(output)
preview = output[:1500]
return f"{preview}\n\n[output truncated — full result ({len(output)} chars) saved to {filepath}. Use run_command to read it: cat {filepath}]"
return output
except subprocess.TimeoutExpired:
return "[skill timed out after 120s]"
except Exception as e:
return f"[skill error: {e}]"
# Discover skills at startup
TOOLS, SKILL_SCRIPTS = discover_skills()
# ─── Logging ─────────────────────────────────────────────────────────
LOG_FILE = f"{WORKSPACE}/agent.log" if os.path.isdir(WORKSPACE) else None
@@ -250,8 +60,34 @@ def log(msg):
pass
# Inject logger into submodules
set_skills_logger(log)
set_tools_logger(log)
# ─── Init ────────────────────────────────────────────────────────────
AGENT_MEMORY = load_memory(WORKSPACE)
TOOLS, SKILL_SCRIPTS = discover_skills(SKILL_DIRS)
log(f"Loaded {len(TOOLS)} skills: {', '.join(SKILL_SCRIPTS.keys())}")
def reload_memory():
global AGENT_MEMORY
AGENT_MEMORY = load_memory(WORKSPACE)
def dispatch_tool(fn_name, fn_args, round_num):
"""Execute a tool call via the skill system."""
script = SKILL_SCRIPTS.get(fn_name)
if not script:
return f"[unknown tool: {fn_name}]"
log(f"Skill [{round_num}]: {fn_name}({str(fn_args)[:60]})")
result = execute_skill(script, fn_args, WORKSPACE, CONFIG)
if fn_name == "save_memory":
reload_memory()
return result
# ─── IRC Client ──────────────────────────────────────────────────────
@@ -303,137 +139,6 @@ class IRCClient:
return lines
# ─── Tool Dispatch ───────────────────────────────────────────────────
def try_parse_tool_call(text):
"""Parse text-based tool calls (model dumps JSON as text)."""
text = re.sub(r"</?tool_call>", "", text).strip()
for start in range(len(text)):
if text[start] == "{":
for end in range(len(text), start, -1):
if text[end - 1] == "}":
try:
obj = json.loads(text[start:end])
name = obj.get("name")
args = obj.get("arguments", {})
if name and isinstance(args, dict):
return (name, args)
except json.JSONDecodeError:
continue
return None
def dispatch_tool(fn_name, fn_args, round_num):
"""Execute a tool call via the skill system."""
script = SKILL_SCRIPTS.get(fn_name)
if not script:
return f"[unknown tool: {fn_name}]"
log(f"Skill [{round_num}]: {fn_name}({str(fn_args)[:60]})")
# Handle save_memory reload
result = execute_skill(script, fn_args)
# Reload full memory if save_memory was called
if fn_name == "save_memory":
reload_memory()
return result
def reload_memory():
"""Reload all memory files from workspace."""
global AGENT_MEMORY
AGENT_MEMORY = ""
try:
with open(f"{WORKSPACE}/MEMORY.md") as f:
AGENT_MEMORY = f.read().strip()
mem_dir = f"{WORKSPACE}/memory"
if os.path.isdir(mem_dir):
for fname in sorted(os.listdir(mem_dir)):
if fname.endswith(".md"):
try:
with open(f"{mem_dir}/{fname}") as f:
topic = fname.replace(".md", "")
AGENT_MEMORY += f"\n\n## {topic}\n{f.read().strip()}"
except Exception:
pass
except FileNotFoundError:
pass
def ollama_request(payload):
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
f"{OLLAMA_URL}/api/chat",
data=data,
headers={"Content-Type": "application/json"},
)
with urllib.request.urlopen(req, timeout=120) as resp:
return json.loads(resp.read())
def query_ollama(messages):
"""Call Ollama chat API with skill-based tool support."""
payload = {
"model": RUNTIME["model"],
"messages": messages,
"stream": False,
"options": {"num_predict": 512},
}
if TOOLS_ENABLED and TOOLS:
payload["tools"] = TOOLS
for round_num in range(MAX_TOOL_ROUNDS):
remaining = MAX_TOOL_ROUNDS - round_num
try:
data = ollama_request(payload)
except (urllib.error.URLError, TimeoutError) as e:
return f"[error: {e}]"
msg = data.get("message", {})
# Structured tool calls
tool_calls = msg.get("tool_calls")
if tool_calls:
messages.append(msg)
for tc in tool_calls:
fn = tc.get("function", {})
result = dispatch_tool(
fn.get("name", ""),
fn.get("arguments", {}),
round_num + 1,
)
# Warn when budget is running low
if remaining <= 2:
result += f"\n[warning: {remaining - 1} tool rounds remaining — wrap up]"
messages.append({"role": "tool", "content": result})
payload["messages"] = messages
continue
# Text-based tool calls
content = msg.get("content", "").strip()
parsed_tool = try_parse_tool_call(content)
if parsed_tool:
fn_name, fn_args = parsed_tool
if fn_name in SKILL_SCRIPTS:
messages.append({"role": "assistant", "content": content})
result = dispatch_tool(fn_name, fn_args, round_num + 1)
if remaining <= 2:
result += f"\n[warning: {remaining - 1} tool rounds remaining — wrap up]"
messages.append({
"role": "user",
"content": f"Tool result:\n{result}\n\nNow respond to the user based on this result.",
})
payload["messages"] = messages
continue
return content
return "[max tool rounds reached]"
# ─── Message Handling ────────────────────────────────────────────────
@@ -527,7 +232,12 @@ def handle_message(irc, source_nick, target, text):
def do_respond():
try:
messages = build_messages(question, channel)
response = query_ollama(messages)
response = query_ollama(
messages, RUNTIME,
TOOLS if TOOLS_ENABLED else [],
SKILL_SCRIPTS, dispatch_tool,
OLLAMA_URL, MAX_TOOL_ROUNDS,
)
if not response:
return