Files
gitea-ci/gitea-ci.py
2026-01-18 17:39:57 +01:00

3069 lines
118 KiB
Python
Executable File

#!/usr/bin/env python3
"""Gitea CI - Monitor and manage Gitea Actions workflows.
Features:
- List recent workflow runs across repositories
- Show detailed job status with live updates
- Stream build logs in real-time
- Trigger workflows manually (workflow_dispatch)
- Re-run failed jobs, cancel running workflows
- List and download build artifacts
- View PR check status, compare workflow runs
- Desktop notifications on completion
- JSON output mode for scripting
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import time
import urllib.request
import urllib.error
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
# Add lib to path for style import
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "lib"))
from style import Color, Diff # noqa: E402
# Configuration
CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "gitea-ci"
CONFIG_FILE = CONFIG_DIR / "config.json"
DEFAULT_INSTANCE = os.environ.get("GITEA_URL", "https://git.mymx.me")
# Status colors and symbols
STATUS_STYLE = {
"success": (Color.GRN, ""),
"failure": (Color.RED, ""),
"cancelled": (Color.YLW, ""),
"running": (Color.BLU, ""),
"waiting": (Color.DIM, ""),
"queued": (Color.DIM, ""),
"pending": (Color.DIM, ""),
"skipped": (Color.DIM, ""),
}
@dataclass
class Config:
"""Gitea CI configuration."""
url: str = DEFAULT_INSTANCE
token: str = ""
default_owner: str = ""
default_repo: str = ""
@classmethod
def load(cls) -> "Config":
"""Load config from file or environment."""
config = cls()
# Environment overrides
config.url = os.environ.get("GITEA_URL", config.url)
config.token = os.environ.get("GITEA_TOKEN", "")
# Load from file
if CONFIG_FILE.exists():
try:
data = json.loads(CONFIG_FILE.read_text())
config.url = data.get("url", config.url)
config.token = data.get("token", config.token)
config.default_owner = data.get("default_owner", "")
config.default_repo = data.get("default_repo", "")
except (json.JSONDecodeError, IOError):
pass
return config
def save(self) -> None:
"""Save config to file."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
data = {
"url": self.url,
"token": self.token,
"default_owner": self.default_owner,
"default_repo": self.default_repo,
}
CONFIG_FILE.write_text(json.dumps(data, indent=2) + "\n")
CONFIG_FILE.chmod(0o600)
class GiteaAPI:
"""Gitea API client with rate limiting support."""
def __init__(self, config: Config):
self.config = config
self.base_url = config.url.rstrip("/") + "/api/v1"
self._retry_delay = 1 # Initial retry delay in seconds
self._max_retry_delay = 60 # Max delay
self._rate_limited = False
def _request(self, endpoint: str, method: str = "GET", retries: int = 3) -> dict | None:
"""Make API request with rate limit handling."""
url = f"{self.base_url}{endpoint}"
headers = {"Accept": "application/json"}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers, method=method)
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
# Reset retry delay on success
self._retry_delay = 1
self._rate_limited = False
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
print(f"{Color.RED}Authentication required. Set GITEA_TOKEN or run: gitea-ci config{Color.RST}")
return None
elif e.code == 404:
return None
elif e.code == 429 or e.code == 403:
# Rate limited - exponential backoff
self._rate_limited = True
retry_after = e.headers.get("Retry-After")
if retry_after and retry_after.isdigit():
delay = int(retry_after)
else:
delay = self._retry_delay
if attempt < retries - 1:
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
# Exponential backoff with max
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}Rate limited. Try again later.{Color.RST}")
return None
elif e.code >= 500:
# Server error - retry with backoff
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Server error {e.code}. Retrying in {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
except urllib.error.URLError as e:
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Connection error. Retrying in {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return None
return None
def _request_text(self, endpoint: str, retries: int = 3) -> str | None:
"""Make API request returning text with rate limit handling."""
url = f"{self.base_url}{endpoint}"
headers = {}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers)
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
self._retry_delay = 1
return resp.read().decode()
except urllib.error.HTTPError as e:
if e.code == 429 or e.code == 403:
retry_after = e.headers.get("Retry-After")
delay = int(retry_after) if retry_after and retry_after.isdigit() else self._retry_delay
if attempt < retries - 1:
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
except urllib.error.URLError:
if attempt < retries - 1:
time.sleep(self._retry_delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
return None
def throttle(self) -> None:
"""Add small delay between requests to avoid rate limiting."""
if self._rate_limited:
time.sleep(0.5)
else:
time.sleep(0.1) # Small delay to be nice to the server
def get_repos(self, limit: int = 20) -> list[Any]:
"""Get repositories for current user."""
data = self._request(f"/user/repos?limit={limit}")
return list(data) if data else []
def create_repo(self, name: str, description: str = "", private: bool = False,
auto_init: bool = True) -> dict | None:
"""Create a new repository."""
data = {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
}
return self._post("/user/repos", data)
def get_repo(self, owner: str, repo: str) -> dict | None:
"""Get repository details."""
return self._request(f"/repos/{owner}/{repo}")
def repo_exists(self, owner: str, repo: str) -> bool:
"""Check if a repository exists."""
return self.get_repo(owner, repo) is not None
def get_runs(self, owner: str, repo: str, limit: int = 10) -> list:
"""Get workflow runs for a repository."""
data = self._request(f"/repos/{owner}/{repo}/actions/runs?limit={limit}")
if data and "workflow_runs" in data:
return data["workflow_runs"]
return data if isinstance(data, list) else []
def get_run(self, owner: str, repo: str, run_id: int) -> dict | None:
"""Get specific workflow run."""
return self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}")
def get_jobs(self, owner: str, repo: str, run_id: int) -> list:
"""Get jobs for a workflow run."""
data = self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs")
if data and "jobs" in data:
return data["jobs"]
return data if isinstance(data, list) else []
def get_job_logs(self, owner: str, repo: str, job_id: int) -> str | None:
"""Get logs for a job."""
return self._request_text(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs")
def trigger_workflow(self, owner: str, repo: str, workflow: str, ref: str = "master",
inputs: dict | None = None) -> bool:
"""Trigger a workflow_dispatch event."""
endpoint = f"/repos/{owner}/{repo}/actions/workflows/{workflow}/dispatches"
data: dict[str, Any] = {"ref": ref}
if inputs:
data["inputs"] = inputs
return self._post(endpoint, data) is not None
def rerun_job(self, owner: str, repo: str, job_id: int) -> bool:
"""Re-run a specific job."""
return self._post(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/rerun", {}) is not None
def rerun_workflow(self, owner: str, repo: str, run_id: int) -> bool:
"""Re-run all jobs in a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun", {}) is not None
def rerun_failed_jobs(self, owner: str, repo: str, run_id: int) -> bool:
"""Re-run only failed jobs in a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun-failed-jobs", {}) is not None
def cancel_run(self, owner: str, repo: str, run_id: int) -> bool:
"""Cancel a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/cancel", {}) is not None
def get_artifacts(self, owner: str, repo: str, run_id: int | None = None) -> list:
"""Get artifacts for a repository or specific run."""
if run_id:
endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}/artifacts"
else:
endpoint = f"/repos/{owner}/{repo}/actions/artifacts"
data = self._request(endpoint)
if data and "artifacts" in data:
return data["artifacts"]
return data if isinstance(data, list) else []
def download_artifact(self, owner: str, repo: str, artifact_id: int, output_path: str) -> bool:
"""Download an artifact to a file."""
endpoint = f"/repos/{owner}/{repo}/actions/artifacts/{artifact_id}/zip"
url = f"{self.base_url}{endpoint}"
headers = {}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
with open(output_path, "wb") as f:
f.write(resp.read())
return True
except (urllib.error.URLError, IOError) as e:
print(f"{Color.RED}Download failed: {e}{Color.RST}")
return False
def get_workflows(self, owner: str, repo: str) -> list:
"""Get list of workflows in a repository."""
data = self._request(f"/repos/{owner}/{repo}/actions/workflows")
if data and "workflows" in data:
return data["workflows"]
return data if isinstance(data, list) else []
def get_pr_checks(self, owner: str, repo: str, pr_number: int) -> dict | None:
"""Get CI check status for a pull request."""
# Get PR details first to find head SHA
pr = self._request(f"/repos/{owner}/{repo}/pulls/{pr_number}")
if not pr:
return None
head_sha = pr.get("head", {}).get("sha", "")
if not head_sha:
return None
# Get commit status
status = self._request(f"/repos/{owner}/{repo}/commits/{head_sha}/status")
return {
"pr": pr,
"sha": head_sha,
"status": status,
}
def get_version(self) -> dict | None:
"""Get Gitea server version info."""
return self._request("/version")
def get_registration_token(self, owner: str = "", repo: str = "") -> str | None:
"""Get runner registration token for repo, org, or user level.
Args:
owner: Repository owner or organization (empty for user-level)
repo: Repository name (empty for org/user-level)
Returns:
Registration token string or None on failure.
"""
if owner and repo:
# Repository-level registration
endpoint = f"/repos/{owner}/{repo}/actions/runners/registration-token"
elif owner:
# Organization-level registration
endpoint = f"/orgs/{owner}/actions/runners/registration-token"
else:
# User-level registration
endpoint = "/user/actions/runners/registration-token"
result = self._post(endpoint, {})
if result and "token" in result:
return result["token"]
return None
def get_runners(self, owner: str = "", repo: str = "") -> list:
"""Get runners for repo, org, or global (admin).
Args:
owner: Repository owner or organization
repo: Repository name (if empty, gets org/global runners)
"""
if owner and repo:
# Repository runners
data = self._request(f"/repos/{owner}/{repo}/actions/runners")
elif owner:
# Organization runners
data = self._request(f"/orgs/{owner}/actions/runners")
else:
# Global runners (admin only)
data = self._request("/admin/runners")
if data and "runners" in data:
return data["runners"]
return data if isinstance(data, list) else []
def get_runner_jobs(self, owner: str = "", repo: str = "") -> list:
"""Get running/pending jobs for runners."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/jobs"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/jobs"
else:
endpoint = "/admin/runners/jobs"
data = self._request(endpoint)
if data and "jobs" in data:
return data["jobs"]
return data if isinstance(data, list) else []
def get_runner(self, runner_id: int, owner: str = "", repo: str = "") -> dict | None:
"""Get a specific runner by ID."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/{runner_id}"
else:
endpoint = f"/admin/runners/{runner_id}"
return self._request(endpoint)
def delete_runner(self, runner_id: int, owner: str = "", repo: str = "") -> bool:
"""Delete a runner by ID."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/{runner_id}"
else:
endpoint = f"/admin/runners/{runner_id}"
return self._delete(endpoint)
def get_workflow_contents(self, owner: str, repo: str, workflow: str) -> str | None:
"""Get workflow file contents."""
# Try .gitea/workflows first, then .github/workflows
for prefix in [".gitea/workflows", ".github/workflows"]:
path = f"{prefix}/{workflow}"
data = self._request(f"/repos/{owner}/{repo}/contents/{path}")
if data and "content" in data:
import base64
try:
return base64.b64decode(data["content"]).decode("utf-8")
except Exception:
pass
return None
def delete_run(self, owner: str, repo: str, run_id: int) -> bool:
"""Delete a workflow run (for queued runs that can't be cancelled)."""
endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}"
return self._delete(endpoint)
def _delete(self, endpoint: str) -> bool:
"""Make DELETE request."""
url = f"{self.base_url}{endpoint}"
headers = {"Accept": "application/json"}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
else:
print(f"{Color.RED}Authentication required for this operation{Color.RST}")
return False
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"{Color.RED}Not found: {endpoint}{Color.RST}")
elif e.code == 403:
print(f"{Color.RED}Permission denied{Color.RST}")
else:
print(f"{Color.RED}Delete failed: {e.code}{Color.RST}")
return False
except urllib.error.URLError as e:
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return False
def _post(self, endpoint: str, data: dict, retries: int = 3) -> dict | None:
"""Make POST request with JSON body."""
url = f"{self.base_url}{endpoint}"
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
else:
print(f"{Color.RED}Authentication required for this operation{Color.RST}")
return None
body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
self._retry_delay = 1
# Handle 204 No Content
if resp.status == 204:
return {}
content = resp.read().decode()
return json.loads(content) if content else {}
except urllib.error.HTTPError as e:
if e.code == 401:
print(f"{Color.RED}Authentication failed. Check your token.{Color.RST}")
return None
elif e.code == 403:
print(f"{Color.RED}Permission denied. Token may lack required scopes.{Color.RST}")
return None
elif e.code == 404:
print(f"{Color.RED}Not found: {endpoint}{Color.RST}")
return None
elif e.code == 422:
print(f"{Color.RED}Invalid request (check workflow name/inputs){Color.RST}")
return None
elif e.code == 429:
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}")
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
except urllib.error.URLError as e:
if attempt < retries - 1:
time.sleep(self._retry_delay)
continue
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return None
return None
def format_time(timestamp: str) -> str:
"""Format ISO timestamp to relative time."""
if not timestamp:
return ""
try:
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
now = datetime.now(dt.tzinfo)
delta = now - dt
if delta.days > 0:
return f"{delta.days}d ago"
elif delta.seconds >= 3600:
return f"{delta.seconds // 3600}h ago"
elif delta.seconds >= 60:
return f"{delta.seconds // 60}m ago"
else:
return "just now"
except (ValueError, TypeError):
return timestamp[:16] if len(timestamp) > 16 else timestamp
def format_duration(start: str, end: str) -> str:
"""Format duration between two timestamps."""
if not start:
return ""
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
if end:
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
else:
end_dt = datetime.now(start_dt.tzinfo)
delta = end_dt - start_dt
total_secs = int(delta.total_seconds())
if total_secs >= 3600:
return f"{total_secs // 3600}h{(total_secs % 3600) // 60:02d}m"
elif total_secs >= 60:
return f"{total_secs // 60}m{total_secs % 60:02d}s"
else:
return f"{total_secs}s"
except (ValueError, TypeError):
return ""
def status_display(status: str, conclusion: str = "") -> str:
"""Get colored status display."""
key = conclusion if conclusion else status
color, symbol = STATUS_STYLE.get(key.lower(), (Color.DIM, "?"))
return f"{color}{symbol}{Color.RST}"
def cmd_list(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List recent workflow runs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
# Full discovery mode - list all repos with recent runs
return cmd_discover(api, args)
# Get runs via API
runs = api.get_runs(owner, repo, limit=args.limit)
if not runs:
if not getattr(args, "compact", False):
print(f"{Color.DIM}No workflow runs found{Color.RST}")
return 0
# Apply filters
status_filter = getattr(args, "status_filter", "") or ""
branch_filter = getattr(args, "branch", "") or ""
workflow_filter = getattr(args, "workflow", "") or ""
if status_filter or branch_filter or workflow_filter:
runs = _filter_runs(runs, status_filter, branch_filter, workflow_filter)
if not runs:
if not getattr(args, "compact", False):
filters = [f for f in [status_filter, branch_filter, workflow_filter] if f]
print(f"{Color.DIM}No runs matching filters: {', '.join(filters)}{Color.RST}")
return 0
# JSON output
if hasattr(args, "json") and args.json:
return _output_json(runs, owner, repo)
# Compact output for scripting
if getattr(args, "compact", False):
for run in runs[:args.limit]:
_print_compact_run(run)
return 0
print(f"{Color.BLD}Workflow runs for {owner}/{repo}:{Color.RST}\n")
# Statistics
_print_repo_stats(runs)
for run in runs[:args.limit]:
_print_single_run(run)
return 0
def _filter_runs(runs: list, status_filter: str = "", branch: str = "", workflow: str = "") -> list:
"""Filter runs by status, branch, and/or workflow."""
filtered = runs
# Status filter
if status_filter == "failed":
filtered = [r for r in filtered if r.get("conclusion") == "failure"]
elif status_filter == "success":
filtered = [r for r in filtered if r.get("conclusion") == "success"]
elif status_filter == "running":
filtered = [r for r in filtered if r.get("status") in ("running", "waiting")]
elif status_filter == "completed":
filtered = [r for r in filtered if r.get("status") == "completed"]
# Branch filter
if branch:
filtered = [r for r in filtered if branch.lower() in r.get("head_branch", "").lower()]
# Workflow filter
if workflow:
workflow_lower = workflow.lower()
filtered = [r for r in filtered if (
workflow_lower in r.get("workflow", "").lower() or
workflow_lower in r.get("path", "").lower() or
workflow_lower in r.get("name", "").lower()
)]
return filtered
def _print_compact_run(run: dict, show_repo: bool = False) -> None:
"""Print a single run in compact single-line format."""
run_id = run.get("id", 0)
status = run.get("status", "")
conclusion = run.get("conclusion", "")
# Status symbol
if conclusion == "success":
sym = f"{Color.GRN}{Color.RST}"
elif conclusion == "failure":
sym = f"{Color.RED}{Color.RST}"
elif status in ("running", "waiting"):
sym = f"{Color.BLU}{Color.RST}"
else:
sym = f"{Color.DIM}{Color.RST}"
branch = run.get("head_branch", "")[:20]
title = run.get("display_title", "")[:40]
workflow = run.get("workflow", "")
repo = run.get("_repo", "")
duration = run.get("duration", "") or format_duration(
run.get("run_started_at", ""), run.get("updated_at", "")
)
# Build compact line
parts = [sym, f"#{run_id:>5}"]
if show_repo and repo:
parts.append(f"{Color.BLD}{repo:<20}{Color.RST}")
if workflow:
parts.append(f"{Color.DIM}{workflow:<15}{Color.RST}")
parts.append(f"{Color.MAG}{branch:<20}{Color.RST}")
parts.append(title)
if duration:
parts.append(f"{Color.DIM}({duration}){Color.RST}")
print(" ".join(parts))
def _print_repo_stats(runs: list) -> None:
"""Print statistics for a single repository."""
total = len(runs)
if total == 0:
return
success = len([r for r in runs if r.get("conclusion") == "success"])
failed = len([r for r in runs if r.get("conclusion") == "failure"])
running = len([r for r in runs if r.get("status") in ("running", "waiting")])
completed = total - running
if completed > 0:
rate = (success / completed) * 100
print(f"{Color.DIM}Stats: {Color.GRN}{success}{Color.RST}{Color.DIM} "
f"{Color.RED}{failed}{Color.RST}{Color.DIM} "
f"{Color.BLU}{running}{Color.RST}{Color.DIM} "
f"({rate:.0f}% success){Color.RST}\n")
def _print_single_run(run: dict) -> None:
"""Print a single run with full details."""
run_id = run.get("id", 0)
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", run.get("head_branch", ""))[:50]
branch = run.get("head_branch", "")
workflow = run.get("workflow", "")
when = format_time(run.get("created_at", ""))
duration = run.get("duration", "") or format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
if workflow:
print(f" {status} {Color.BLD}{workflow}{Color.RST}")
else:
print(f" {status} {Color.BLD}#{run_id}{Color.RST}")
print(f" {title}")
info_parts = []
if branch:
info_parts.append(f"{Color.MAG}{branch}{Color.RST}")
if when:
info_parts.append(when)
if duration:
info_parts.append(duration)
if info_parts:
print(f" {Color.DIM}{' · '.join(info_parts)}{Color.RST}")
print()
def _output_json(runs: list, owner: str, repo: str) -> int:
"""Output runs as JSON."""
import json as json_mod
runs_list: list[dict[str, Any]] = []
output: dict[str, Any] = {
"repository": f"{owner}/{repo}",
"total": len(runs),
"runs": runs_list
}
for run in runs:
runs_list.append({
"id": run.get("id"),
"status": run.get("status"),
"conclusion": run.get("conclusion"),
"title": run.get("display_title"),
"branch": run.get("head_branch"),
"workflow": run.get("workflow"),
"created_at": run.get("created_at"),
"duration": run.get("duration"),
})
print(json_mod.dumps(output, indent=2))
return 0
def cmd_discover(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Discover all repositories and their workflow runs."""
print(f"{Color.BLD}Discovering workflows on {api.config.url}...{Color.RST}\n")
# Get current user
user = api._request("/user")
if not user:
print(f"{Color.YLW}API token required for discovery.{Color.RST}")
print(f"{Color.DIM}Use: gitea-ci config --token YOUR_TOKEN{Color.RST}")
print(f"{Color.DIM}Or use gitea-scrape for public repos without auth{Color.RST}")
return 1
username = user.get("login", "")
print(f"{Color.DIM}User:{Color.RST} {username}\n")
# Get all repos (user's own + orgs)
repos = api.get_repos(limit=100)
# Also get repos from organizations
orgs: list[Any] = list(api._request("/user/orgs") or [])
for org in orgs:
org_repos = api._request(f"/orgs/{org.get('username', '')}/repos?limit=100")
if org_repos:
repos.extend(org_repos)
if not repos:
print(f"{Color.DIM}No repositories found{Color.RST}")
return 0
# Collect all recent runs
all_runs = []
repos_with_actions = 0
for i, r in enumerate(repos):
repo_owner = r.get("owner", {}).get("login", "")
repo_name = r.get("name", "")
full_name = r.get("full_name", f"{repo_owner}/{repo_name}")
# Progress indicator for large repos
if len(repos) > 10 and (i + 1) % 5 == 0:
print(f"{Color.DIM} Scanning {i + 1}/{len(repos)} repositories...{Color.RST}", end="\r", file=sys.stderr)
runs = api.get_runs(repo_owner, repo_name, limit=3)
if runs:
repos_with_actions += 1
for run in runs:
run["_repo"] = full_name
all_runs.append(run)
# Throttle to avoid rate limiting
api.throttle()
if not all_runs:
print(f"{Color.DIM}No workflow runs found across {len(repos)} repositories{Color.RST}")
return 0
# Sort by created_at descending
all_runs.sort(key=lambda x: x.get("created_at", ""), reverse=True)
_print_run_summary(all_runs, repos_with_actions)
return 0
def _print_run_summary(all_runs: list, repos_with_actions: int) -> None:
"""Print summary of workflow runs grouped by status."""
# Calculate statistics
total = len(all_runs)
success = len([r for r in all_runs if r.get("conclusion") == "success"])
failed = [r for r in all_runs if r.get("conclusion") == "failure"]
running = [r for r in all_runs if r.get("status") == "running" or r.get("status") == "waiting"]
cancelled = len([r for r in all_runs if r.get("conclusion") == "cancelled"])
# Print statistics header
print(f"{Color.BLD}Statistics:{Color.RST}")
completed = total - len(running)
if completed > 0:
success_rate = (success / completed) * 100
print(f" {Color.GRN}{success}{Color.RST} success "
f"{Color.RED}{len(failed)}{Color.RST} failed "
f"{Color.BLU}{len(running)}{Color.RST} running "
f"{Color.DIM}{cancelled} cancelled{Color.RST}")
print(f" {Color.DIM}Success rate: {success_rate:.0f}% ({success}/{completed} completed){Color.RST}")
else:
print(f" {Color.BLU}{len(running)}{Color.RST} running "
f"{Color.DIM}No completed runs yet{Color.RST}")
print(f" {Color.DIM}Repositories with actions: {repos_with_actions}{Color.RST}")
print()
# Show running first
if running:
print(f"{Color.BLU}{Color.BLD}Running/Waiting ({len(running)}):{Color.RST}\n")
for run in running[:5]:
_print_run_line(run)
if len(running) > 5:
print(f" {Color.DIM}... and {len(running) - 5} more{Color.RST}\n")
# Show recent failures
if failed:
print(f"{Diff.DEL}{Color.BLD} Recent Failures ({len(failed)}) {Color.RST}\n")
for run in failed[:5]:
_print_run_line(run)
if len(failed) > 5:
print(f" {Color.DIM}... and {len(failed) - 5} more{Color.RST}\n")
# Show recent activity
recent = all_runs[:10]
print(f"{Color.BLD}Recent Activity:{Color.RST}\n")
for run in recent:
_print_run_line(run)
def _print_run_line(run: dict) -> None:
"""Print a single run line with repo info."""
run_id = run.get("id", 0)
repo = run.get("_repo", "")
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", run.get("head_branch", ""))[:40]
branch = run.get("head_branch", "")
workflow = run.get("workflow", "")
# Get time - try different fields
when = format_time(run.get("created_at", ""))
# Get duration - try pre-computed or calculate
duration = run.get("duration", "")
if not duration:
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
# Build info line
info_parts = []
if branch:
info_parts.append(f"{Color.MAG}{branch}{Color.RST}")
if when:
info_parts.append(when)
if duration:
info_parts.append(duration)
info_str = " · ".join(info_parts) if info_parts else ""
# Print with workflow name if available
if workflow:
print(f" {status} {Color.BLD}{repo}{Color.RST} {Color.DIM}{workflow}{Color.RST}")
else:
print(f" {status} {Color.BLD}{repo}{Color.RST} #{run_id}")
print(f" {title}")
if info_str:
print(f" {Color.DIM}{info_str}{Color.RST}")
print()
def cmd_status(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show detailed status of a workflow run."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required. Use -o OWNER -r REPO or set defaults{Color.RST}")
return 1
# Get run (latest or specific)
if args.run_id:
run = api.get_run(owner, repo, args.run_id)
else:
runs = api.get_runs(owner, repo, limit=1)
run = runs[0] if runs else None
if not run:
print(f"{Color.RED}Run not found{Color.RST}")
return 1
run_id = run.get("id", 0)
# Get jobs
jobs = api.get_jobs(owner, repo, run_id)
# JSON output
if getattr(args, "json", False):
output = {
"repository": f"{owner}/{repo}",
"run": {
"id": run_id,
"status": run.get("status"),
"conclusion": run.get("conclusion"),
"title": run.get("display_title"),
"branch": run.get("head_branch"),
"commit": run.get("head_sha"),
"created_at": run.get("created_at"),
"started_at": run.get("run_started_at"),
"updated_at": run.get("updated_at"),
},
"jobs": [
{
"id": j.get("id"),
"name": j.get("name"),
"status": j.get("status"),
"conclusion": j.get("conclusion"),
"started_at": j.get("started_at"),
"completed_at": j.get("completed_at"),
}
for j in jobs
] if jobs else []
}
print(json.dumps(output, indent=2))
return 0
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", "")
branch = run.get("head_branch", "")
commit = run.get("head_sha", "")[:8]
when = format_time(run.get("created_at", ""))
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
print(f"{Color.BLD}Run #{run_id}{Color.RST} {status}")
print(f" {Color.DIM}Title:{Color.RST} {title}")
print(f" {Color.DIM}Branch:{Color.RST} {branch} ({commit})")
print(f" {Color.DIM}Started:{Color.RST} {when}")
print(f" {Color.DIM}Duration:{Color.RST} {duration}")
print()
if jobs:
print(f"{Color.BLD}Jobs:{Color.RST}\n")
pending_jobs = []
for job in jobs:
job_id = job.get("id", 0)
job_status_raw = job.get("status", "")
job_status = status_display(job_status_raw, job.get("conclusion", ""))
job_name = job.get("name", "unknown")
job_duration = format_duration(job.get("started_at", ""), job.get("completed_at", ""))
runs_on = job.get("labels", []) # May be empty, will try workflow
print(f" {job_status} {job_name} {Color.DIM}(ID: {job_id}, {job_duration}){Color.RST}")
# Collect pending jobs for diagnostics
if job_status_raw in ("pending", "waiting", "queued"):
pending_jobs.append({
"name": job_name,
"id": job_id,
"labels": runs_on,
"created": job.get("created_at", run.get("created_at", "")),
})
# Show pending diagnostics if there are pending jobs
if pending_jobs and not getattr(args, "json", False):
print(f"\n{Color.BLD}Pending Diagnostics:{Color.RST}\n")
# Get available runners
runners = api.get_runners("", "") # User-level runners first
if not runners:
runners = api.get_runners(owner, repo)
if not runners:
print(f" {Color.RED}{Color.RST} No runners available")
print(f" {Color.DIM}Register a runner: gitea-ci register-token{Color.RST}")
else:
online_runners = [r for r in runners if r.get("status") == "online"]
busy_runners = [r for r in runners if r.get("busy", False)]
runner_labels = set()
online_labels = set()
for r in runners:
labels = r.get("labels", [])
runner_labels.update(labels)
if r.get("status") == "online":
online_labels.update(labels)
print(f" Runners: {len(online_runners)} online, {len(busy_runners)} busy, {len(runners)} total")
# For each pending job, check why it might be stuck
for pj in pending_jobs:
created_str = pj.get("created", "")
if created_str:
try:
created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
pending_secs = (datetime.now(created_dt.tzinfo) - created_dt).total_seconds()
pending_time = f"{int(pending_secs // 60)}m {int(pending_secs % 60)}s" if pending_secs >= 60 else f"{int(pending_secs)}s"
except Exception:
pending_time = "?"
else:
pending_time = "?"
job_labels = pj.get("labels", [])
if not job_labels:
# Try to get from workflow - use default label
job_labels = ["linux"] # Common default
print(f"\n {Color.YLW}{Color.RST} {pj['name']} {Color.DIM}(pending {pending_time}){Color.RST}")
if job_labels:
missing = set(job_labels) - runner_labels
offline_labels = (set(job_labels) & runner_labels) - online_labels
if missing:
print(f" {Color.RED}Missing labels:{Color.RST} {', '.join(missing)}")
elif offline_labels:
print(f" {Color.YLW}Runners offline for:{Color.RST} {', '.join(offline_labels)}")
elif busy_runners and len(busy_runners) == len(online_runners):
print(f" {Color.YLW}All runners busy{Color.RST}")
else:
print(f" {Color.DIM}Waiting for runner...{Color.RST}")
return 0
def cmd_logs(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show logs for a job."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
job_id = args.job_id
# Get run_id from argument or latest
if args.run_id:
run_id = args.run_id
else:
runs = api.get_runs(owner, repo, limit=1)
if not runs:
print(f"{Color.RED}No runs found{Color.RST}")
return 1
run_id = runs[0].get("id")
# If no job_id, get jobs for the run
if not job_id:
jobs = api.get_jobs(owner, repo, run_id)
if not jobs:
print(f"{Color.RED}No jobs found{Color.RST}")
return 1
# Prefer failed job
for job in jobs:
if job.get("conclusion") == "failure":
job_id = job.get("id")
break
if not job_id:
job_id = jobs[0].get("id")
logs = api.get_job_logs(owner, repo, job_id)
if logs:
print(logs)
else:
print(f"{Color.RED}No logs available (may require authentication){Color.RST}")
return 1
return 0
def cmd_watch(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Watch a running workflow."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
interval = args.interval
notify = getattr(args, "notify", False)
timeout = getattr(args, "timeout", 0)
start_time = time.time()
timeout_msg = f", timeout: {timeout}s" if timeout else ""
print(f"{Color.BLD}Watching {owner}/{repo}...{Color.RST} (Ctrl+C to stop{timeout_msg})\n")
try:
while True:
# Check timeout
if timeout and (time.time() - start_time) >= timeout:
print(f"\n{Color.YLW}Timeout reached ({timeout}s){Color.RST}")
return 2
# Clear screen for refresh
if not args.no_clear:
print("\033[2J\033[H", end="")
# Get run status
if run_id:
run = api.get_run(owner, repo, run_id)
runs = [run] if run else []
else:
runs = api.get_runs(owner, repo, limit=5)
if not runs:
print(f"{Color.DIM}No active runs{Color.RST}")
else:
for run in runs:
rid = run.get("id", 0)
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", "")[:40]
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
print(f"{status} {Color.BLD}#{rid}{Color.RST} {title} {Color.DIM}({duration}){Color.RST}")
# Show jobs for this run
jobs = api.get_jobs(owner, repo, rid)
for job in jobs:
job_status = status_display(job.get("status", ""), job.get("conclusion", ""))
job_name = job.get("name", "")
print(f" {job_status} {job_name}")
print()
# Stop watching if run is complete
if run.get("status") == "completed":
conclusion = run.get("conclusion", "")
if conclusion == "success":
print(f"{Diff.ADD} ✓ Run completed successfully {Color.RST}")
if notify:
send_notification(
"CI Passed",
f"{owner}/{repo} #{rid} succeeded",
"normal"
)
elif conclusion == "failure":
print(f"{Diff.DEL} ✗ Run failed {Color.RST}")
if notify:
send_notification(
"CI Failed",
f"{owner}/{repo} #{rid} failed",
"critical"
)
else:
print(f"{Diff.CHG} ⚠ Run {conclusion} {Color.RST}")
if notify:
send_notification(
f"CI {conclusion.title()}",
f"{owner}/{repo} #{rid} {conclusion}",
"normal"
)
if run_id: # Only exit if watching specific run
return 0 if conclusion == "success" else 1
time.sleep(interval)
except KeyboardInterrupt:
print(f"\n{Color.DIM}Stopped watching{Color.RST}")
return 0
def _check_token_permissions(api: GiteaAPI) -> int:
"""Check what permissions the current token has."""
config = api.config
if not config.token:
print(f"{Color.RED}No token configured{Color.RST}")
print(f"{Color.DIM}Set with: gitea-ci config token set <token>{Color.RST}")
return 1
print(f"{Color.BLD}Token Permissions{Color.RST}")
print("" * 35)
# Basic auth - GET /user
user = api._request("/user")
if user:
username = user.get("login", "unknown")
print(f" {Color.GRN}{Color.RST} Authenticated as: {username}")
else:
print(f" {Color.RED}{Color.RST} Authentication failed")
return 1
# Read repos - GET /user/repos
repos = api._request("/user/repos?limit=1")
if repos is not None:
print(f" {Color.GRN}{Color.RST} Read repositories")
else:
print(f" {Color.RED}{Color.RST} Read repositories")
# Organization access - GET /user/orgs
orgs = api._request("/user/orgs")
if orgs is not None:
org_count = len(orgs) if isinstance(orgs, list) else 0
print(f" {Color.GRN}{Color.RST} Organization access ({org_count} orgs)")
else:
print(f" {Color.RED}{Color.RST} Organization access")
# Admin access - GET /admin/runners
runners = api._request("/admin/runners")
if runners is not None:
print(f" {Color.GRN}{Color.RST} Admin access")
else:
print(f" {Color.DIM}{Color.RST} Admin access (not admin or scope missing)")
# User-level runners - GET /user/actions/runners/registration-token
# This tests if we can manage user-level runners
user_runners = api._request("/user/actions/runners")
if user_runners is not None:
print(f" {Color.GRN}{Color.RST} User-level runners")
else:
print(f" {Color.DIM}{Color.RST} User-level runners")
# Check if we can trigger workflows (requires repo scope)
print()
print(f"{Color.DIM}Token ends with: ...{config.token[-4:]}{Color.RST}")
print(f"{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}")
return 0
def cmd_config(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Configure gitea-ci."""
config = api.config
# Handle token subcommand
action = getattr(args, "action", None)
if action == "token":
token_action = getattr(args, "token_action", None)
if token_action == "check":
return _check_token_permissions(api)
elif token_action == "set":
new_token = getattr(args, "token_value", None)
if not new_token:
print(f"{Color.RED}Token value required{Color.RST}")
return 1
config.token = new_token
config.save()
print(f"{Color.GRN}Token saved to {CONFIG_FILE}{Color.RST}")
return 0
else:
# Just show token info
if config.token:
print(f"{Color.BLD}Token:{Color.RST} {'*' * 8}...{config.token[-4:]}")
print(f"{Color.DIM}Use 'gitea-ci config token check' to test permissions{Color.RST}")
else:
print(f"{Color.YLW}No token configured{Color.RST}")
print(f"{Color.DIM}Set with: gitea-ci config token set <token>{Color.RST}")
return 0
if args.show:
print(f"{Color.BLD}Current configuration:{Color.RST}")
print(f" URL: {config.url}")
print(f" Token: {'***' if config.token else '(not set)'}")
print(f" Default owner: {config.default_owner or '(not set)'}")
print(f" Default repo: {config.default_repo or '(not set)'}")
print(f"\n{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}")
return 0
if args.test:
print(f"{Color.BLD}Testing API connectivity...{Color.RST}\n")
print(f" URL: {config.url}")
print(f" Token: {'configured' if config.token else 'not set'}")
print()
# Test basic connectivity
try:
req = urllib.request.Request(
f"{config.url}/api/v1/version",
headers={"Accept": "application/json"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
version = data.get("version", "unknown")
print(f" {Color.GRN}{Color.RST} Server reachable (Gitea {version})")
except urllib.error.URLError as e:
print(f" {Color.RED}{Color.RST} Connection failed: {e.reason}")
return 1
# Test authentication if token set
if config.token:
user = api._request("/user")
if user:
username = user.get("login", "unknown")
print(f" {Color.GRN}{Color.RST} Authenticated as: {username}")
else:
print(f" {Color.RED}{Color.RST} Authentication failed (invalid token?)")
return 1
else:
print(f" {Color.YLW}{Color.RST} No token configured (read-only mode)")
# Test default repo if configured
if config.default_owner and config.default_repo:
runs = api.get_runs(config.default_owner, config.default_repo, limit=1)
if runs is not None:
print(f" {Color.GRN}{Color.RST} Default repo accessible: {config.default_owner}/{config.default_repo}")
else:
print(f" {Color.YLW}{Color.RST} Default repo not accessible (may not have actions)")
print(f"\n{Color.GRN}API connectivity OK{Color.RST}")
return 0
# Interactive config
print(f"{Color.BLD}Gitea CI Configuration{Color.RST}\n")
url = input(f"Gitea URL [{config.url}]: ").strip()
if url:
config.url = url
token = input(f"API Token [{'***' if config.token else 'none'}]: ").strip()
if token:
config.token = token
owner = input(f"Default owner [{config.default_owner}]: ").strip()
if owner:
config.default_owner = owner
repo = input(f"Default repo [{config.default_repo}]: ").strip()
if repo:
config.default_repo = repo
config.save()
print(f"\n{Color.GRN}Configuration saved to {CONFIG_FILE}{Color.RST}")
return 0
def cmd_repo(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Manage repositories."""
action = getattr(args, "action", "list")
if action == "list":
# List user's repositories
print(f"{Color.BLD}Repositories:{Color.RST}\n")
repos = api.get_repos(limit=100)
if not repos:
print(f"{Color.DIM}No repositories found{Color.RST}")
return 0
for r in repos:
name = r.get("full_name", "")
desc = r.get("description", "")[:40] if r.get("description") else ""
private = r.get("private", False)
visibility = f"{Color.YLW}private{Color.RST}" if private else f"{Color.GRN}public{Color.RST}"
has_actions = r.get("has_actions", False)
actions_icon = f" {Color.DIM}[actions]{Color.RST}" if has_actions else ""
print(f" {name} [{visibility}]{actions_icon}")
if desc:
print(f" {Color.DIM}{desc}{Color.RST}")
return 0
elif action == "create":
name = args.name
if not name:
print(f"{Color.RED}Repository name required{Color.RST}")
return 1
description = getattr(args, "description", "") or ""
private = getattr(args, "private", False)
# Check if already exists
user = api._request("/user")
if not user:
print(f"{Color.RED}Authentication required{Color.RST}")
return 1
username = user.get("login", "")
if api.repo_exists(username, name):
print(f"{Color.YLW}Repository {username}/{name} already exists{Color.RST}")
return 0
print(f"Creating repository {Color.BLD}{name}{Color.RST}...")
result = api.create_repo(name, description, private)
if result:
full_name = result.get("full_name", name)
clone_url = result.get("ssh_url", result.get("clone_url", ""))
print(f"{Color.GRN}{Color.RST} Created: {full_name}")
if clone_url:
print(f" {Color.DIM}Clone: {clone_url}{Color.RST}")
return 0
else:
print(f"{Color.RED}{Color.RST} Failed to create repository")
return 1
elif action == "exists":
owner = args.owner or api.config.default_owner
repo = args.repo or args.name
if not owner or not repo:
print(f"{Color.RED}Repository (owner/repo) required{Color.RST}")
return 1
if api.repo_exists(owner, repo):
print(f"{Color.GRN}{Color.RST} {owner}/{repo} exists")
return 0
else:
print(f"{Color.RED}{Color.RST} {owner}/{repo} does not exist")
return 1
elif action == "check":
owner = args.owner or api.config.default_owner
repo = args.repo or args.name or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository (owner/repo) required{Color.RST}")
return 1
print(f"{Color.BLD}Repository Check: {owner}/{repo}{Color.RST}\n")
# Check if repository exists and has actions enabled
repo_info = api._request(f"/repos/{owner}/{repo}")
if not repo_info:
print(f"{Color.RED}{Color.RST} Repository not found or not accessible")
return 1
has_actions = repo_info.get("has_actions", False)
if has_actions:
print(f"{Color.GRN}{Color.RST} Actions enabled")
else:
print(f"{Color.RED}{Color.RST} Actions not enabled")
print(f" {Color.DIM}Enable in repository Settings → Actions{Color.RST}")
return 1
# Get workflows
workflows = api.get_workflows(owner, repo)
if workflows:
print(f"{Color.GRN}{Color.RST} Found {len(workflows)} workflow(s)")
else:
print(f"{Color.YLW}{Color.RST} No workflows found")
print(f" {Color.DIM}Create .gitea/workflows/*.yml or .github/workflows/*.yml{Color.RST}")
return 0
# Get available runners (user scope first)
runners = api.get_runners("", "") # User-level runners
if not runners:
# Try repo scope
runners = api.get_runners(owner, repo)
runner_labels = set()
online_labels = set()
for r in runners:
labels = r.get("labels", [])
runner_labels.update(labels)
if r.get("status") == "online":
online_labels.update(labels)
if runners:
online_count = sum(1 for r in runners if r.get("status") == "online")
print(f"{Color.GRN}{Color.RST} Runners available: {online_count} online, {len(runners)} total")
else:
print(f"{Color.RED}{Color.RST} No runners found")
print(f" {Color.DIM}Register a runner with: gitea-ci register-token{Color.RST}")
return 1
# Analyze workflows for runs-on labels
print(f"\n{Color.BLD}Workflow Analysis:{Color.RST}\n")
issues = []
for wf in workflows:
wf_name = wf.get("name", "unnamed")
wf_path = wf.get("path", "")
wf_file = wf_path.split("/")[-1] if wf_path else "unknown"
# Get workflow contents and parse runs-on
content = api.get_workflow_contents(owner, repo, wf_file)
if not content:
print(f" {Color.DIM}{Color.RST} {wf_name} {Color.DIM}(cannot read){Color.RST}")
continue
# Parse runs-on labels from YAML content
runs_on_matches = re.findall(r'runs-on:\s*([^\n]+)', content)
required_labels = set()
for match in runs_on_matches:
# Handle array syntax: [label1, label2] or simple string
match = match.strip()
if match.startswith('['):
# Array syntax
labels = re.findall(r'[\w-]+', match)
required_labels.update(labels)
else:
# Simple string (may have quotes)
label = match.strip('\'"')
required_labels.add(label)
if not required_labels:
print(f" {Color.DIM}{Color.RST} {wf_name} {Color.DIM}(no runs-on found){Color.RST}")
continue
# Check if labels match available runners
missing = required_labels - runner_labels
offline = (required_labels & runner_labels) - online_labels
if missing:
print(f" {Color.RED}{Color.RST} {wf_name}")
print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}")
print(f" {Color.RED}missing:{Color.RST} {', '.join(missing)}")
issues.append((wf_name, "missing labels", missing))
elif offline:
print(f" {Color.YLW}{Color.RST} {wf_name}")
print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}")
print(f" {Color.YLW}offline:{Color.RST} {', '.join(offline)}")
issues.append((wf_name, "runners offline", offline))
else:
print(f" {Color.GRN}{Color.RST} {wf_name} {Color.DIM}({', '.join(required_labels)}){Color.RST}")
# Summary
if issues:
print(f"\n{Color.BLD}Issues Found:{Color.RST}\n")
all_missing = set()
for wf_name, issue_type, labels in issues:
if issue_type == "missing labels":
all_missing.update(labels)
if all_missing:
print(f" Missing runner labels: {', '.join(sorted(all_missing))}")
print(f" {Color.DIM}Register runners with these labels or update workflows{Color.RST}")
return 1
print(f"\n{Color.GRN}All workflows have matching runners{Color.RST}")
return 0
return 0
def cmd_trigger(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Trigger a workflow manually."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
workflow = args.workflow
ref = args.ref or "master"
# Parse inputs if provided
inputs = {}
if hasattr(args, "input") and args.input:
for inp in args.input:
if "=" in inp:
key, value = inp.split("=", 1)
inputs[key] = value
# If no workflow specified, list available ones
if not workflow:
workflows = api.get_workflows(owner, repo)
if not workflows:
print(f"{Color.DIM}No workflows found{Color.RST}")
return 1
print(f"{Color.BLD}Available workflows:{Color.RST}\n")
for wf in workflows:
name = wf.get("name", "")
path = wf.get("path", "")
state = wf.get("state", "")
print(f" {Color.BLD}{path}{Color.RST}")
if name:
print(f" Name: {name}")
if state:
print(f" State: {state}")
print()
print(f"{Color.DIM}Use: gitea-ci trigger -w <workflow.yml>{Color.RST}")
return 0
print(f"{Color.BLD}Triggering workflow...{Color.RST}")
print(f" Workflow: {workflow}")
print(f" Branch: {ref}")
if inputs:
print(f" Inputs: {inputs}")
if api.trigger_workflow(owner, repo, workflow, ref, inputs if inputs else None):
print(f"\n{Color.GRN}✓ Workflow triggered successfully{Color.RST}")
print(f"{Color.DIM}Use 'gitea-ci watch {owner}/{repo}' to monitor{Color.RST}")
return 0
else:
print(f"\n{Color.RED}✗ Failed to trigger workflow{Color.RST}")
return 1
def cmd_rerun(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Re-run a workflow or job."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
# Re-run specific job
if args.job_id:
print(f"{Color.BLD}Re-running job #{args.job_id}...{Color.RST}")
if api.rerun_job(owner, repo, args.job_id):
print(f"{Color.GRN}✓ Job re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run job{Color.RST}")
return 1
# Get run ID
run_id = args.run_id
if not run_id:
# Get latest run
runs = api.get_runs(owner, repo, limit=1)
if not runs:
print(f"{Color.RED}No runs found{Color.RST}")
return 1
run_id = runs[0].get("id")
# Re-run failed jobs only or all jobs
if args.failed_only:
print(f"{Color.BLD}Re-running failed jobs in run #{run_id}...{Color.RST}")
if api.rerun_failed_jobs(owner, repo, run_id):
print(f"{Color.GRN}✓ Failed jobs re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run jobs{Color.RST}")
return 1
else:
print(f"{Color.BLD}Re-running all jobs in run #{run_id}...{Color.RST}")
if api.rerun_workflow(owner, repo, run_id):
print(f"{Color.GRN}✓ Workflow re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run workflow{Color.RST}")
return 1
def cmd_cancel(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Cancel a running workflow."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
if not run_id:
# Get latest running run
runs = api.get_runs(owner, repo, limit=10)
running = [r for r in runs if r.get("status") in ("running", "waiting", "queued")]
if not running:
print(f"{Color.DIM}No running workflows to cancel{Color.RST}")
return 0
run_id = running[0].get("id")
print(f"{Color.BLD}Cancelling run #{run_id}...{Color.RST}")
if api.cancel_run(owner, repo, run_id):
print(f"{Color.GRN}✓ Workflow cancelled{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to cancel workflow{Color.RST}")
return 1
def cmd_artifacts(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List or download artifacts."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
# Download specific artifact
if args.download:
artifact_id = args.download
output = args.output or f"artifact-{artifact_id}.zip"
print(f"{Color.BLD}Downloading artifact #{artifact_id}...{Color.RST}")
if api.download_artifact(owner, repo, artifact_id, output):
size = Path(output).stat().st_size
print(f"{Color.GRN}✓ Downloaded to {output} ({size:,} bytes){Color.RST}")
return 0
return 1
# List artifacts
run_id = args.run_id
artifacts = api.get_artifacts(owner, repo, run_id)
if not artifacts:
print(f"{Color.DIM}No artifacts found{Color.RST}")
return 0
print(f"{Color.BLD}Artifacts:{Color.RST}\n")
for art in artifacts:
art_id = art.get("id", 0)
name = art.get("name", "unknown")
size = art.get("size_in_bytes", 0)
expired = art.get("expired", False)
created = format_time(art.get("created_at", ""))
status_icon = f"{Color.DIM}(expired){Color.RST}" if expired else ""
print(f" {Color.BLD}#{art_id}{Color.RST} {name} {status_icon}")
print(f" {Color.DIM}Size: {size:,} bytes · Created: {created}{Color.RST}")
print()
print(f"{Color.DIM}Download: gitea-ci artifacts -d <ID> [-O filename.zip]{Color.RST}")
return 0
def cmd_pr(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show CI status for a pull request."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
pr_number = args.pr_number
if not pr_number:
print(f"{Color.RED}PR number required (-p/--pr){Color.RST}")
return 1
data = api.get_pr_checks(owner, repo, pr_number)
if not data:
print(f"{Color.RED}PR #{pr_number} not found{Color.RST}")
return 1
pr = data["pr"]
sha = data["sha"][:8]
status = data.get("status", {})
# PR info
title = pr.get("title", "")
state = pr.get("state", "")
mergeable = pr.get("mergeable", False)
user = pr.get("user", {}).get("login", "")
print(f"{Color.BLD}PR #{pr_number}: {title}{Color.RST}")
print(f" {Color.DIM}Author:{Color.RST} {user}")
print(f" {Color.DIM}State:{Color.RST} {state}")
print(f" {Color.DIM}HEAD:{Color.RST} {sha}")
print(f" {Color.DIM}Mergeable:{Color.RST} {'Yes' if mergeable else 'No'}")
print()
# CI status
if status:
overall = status.get("state", "pending")
statuses = status.get("statuses", [])
# Map state to display
state_display = {
"success": f"{Color.GRN}✓ All checks passed{Color.RST}",
"pending": f"{Color.YLW}● Checks in progress{Color.RST}",
"failure": f"{Color.RED}✗ Some checks failed{Color.RST}",
"error": f"{Color.RED}✗ Check errors{Color.RST}",
}
print(f"{Color.BLD}CI Status:{Color.RST} {state_display.get(overall, overall)}\n")
if statuses:
for s in statuses:
ctx = s.get("context", "")
state = s.get("state", "")
desc = s.get("description", "")[:50]
icon = status_display(state, state)
print(f" {icon} {ctx}")
if desc:
print(f" {Color.DIM}{desc}{Color.RST}")
else:
# Try to get workflow runs for this commit
runs = api.get_runs(owner, repo, limit=5)
commit_runs = [r for r in runs if r.get("head_sha", "").startswith(sha)]
if commit_runs:
print(f" {Color.DIM}Workflow runs for this commit:{Color.RST}")
for run in commit_runs:
run_status = status_display(run.get("status", ""), run.get("conclusion", ""))
run_title = run.get("display_title", "")[:40]
print(f" {run_status} {run_title}")
else:
print(f"{Color.DIM}No CI status found{Color.RST}")
return 0
def cmd_compare(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Compare two workflow runs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run1_id = args.run1
run2_id = args.run2
# If only one run specified, compare with previous
if not run2_id:
runs = api.get_runs(owner, repo, limit=20)
found_idx = None
for i, r in enumerate(runs):
if r.get("id") == run1_id:
found_idx = i
break
if found_idx is not None and found_idx + 1 < len(runs):
run2_id = runs[found_idx + 1].get("id")
else:
print(f"{Color.RED}Cannot find previous run to compare{Color.RST}")
return 1
# Fetch both runs and their jobs
run1 = api.get_run(owner, repo, run1_id)
run2 = api.get_run(owner, repo, run2_id)
if not run1 or not run2:
print(f"{Color.RED}Could not fetch one or both runs{Color.RST}")
return 1
jobs1 = api.get_jobs(owner, repo, run1_id)
jobs2 = api.get_jobs(owner, repo, run2_id)
# JSON output
if getattr(args, "json", False):
job_changes: list[dict[str, Any]] = []
output: dict[str, Any] = {
"repository": f"{owner}/{repo}",
"run1": {"id": run1_id, "status": run1.get("status"), "conclusion": run1.get("conclusion")},
"run2": {"id": run2_id, "status": run2.get("status"), "conclusion": run2.get("conclusion")},
"job_changes": job_changes,
}
jobs1_map = {j.get("name"): j for j in jobs1}
jobs2_map = {j.get("name"): j for j in jobs2}
all_job_names = set(jobs1_map.keys()) | set(jobs2_map.keys())
for name in sorted(all_job_names):
j1 = jobs1_map.get(name, {})
j2 = jobs2_map.get(name, {})
job_changes.append({
"name": name,
"run1_conclusion": j1.get("conclusion"),
"run2_conclusion": j2.get("conclusion"),
"changed": j1.get("conclusion") != j2.get("conclusion"),
})
print(json.dumps(output, indent=2))
return 0
# Pretty output
print(f"{Color.BLD}Comparing runs:{Color.RST}")
print(f" Run #{run1_id}: {status_display(run1.get('status', ''), run1.get('conclusion', ''))} "
f"{run1.get('display_title', '')[:40]}")
print(f" Run #{run2_id}: {status_display(run2.get('status', ''), run2.get('conclusion', ''))} "
f"{run2.get('display_title', '')[:40]}")
print()
# Compare jobs
jobs1_map = {j.get("name"): j for j in jobs1}
jobs2_map = {j.get("name"): j for j in jobs2}
all_job_names_sorted = sorted(set(jobs1_map.keys()) | set(jobs2_map.keys()))
changes = []
unchanged = []
for name in all_job_names_sorted:
j1 = jobs1_map.get(name, {})
j2 = jobs2_map.get(name, {})
c1 = j1.get("conclusion", "missing")
c2 = j2.get("conclusion", "missing")
if c1 != c2:
changes.append((name, c1, c2))
else:
unchanged.append((name, c1))
if changes:
print(f"{Color.BLD}Changed jobs:{Color.RST}\n")
for name, c1, c2 in changes:
s1 = status_display("completed", c1)
s2 = status_display("completed", c2)
print(f" {name}")
print(f" {s1} #{run1_id}{s2} #{run2_id}")
print()
if unchanged and not changes:
print(f"{Color.DIM}All {len(unchanged)} jobs have same status{Color.RST}")
elif unchanged:
print(f"{Color.DIM}Unchanged: {len(unchanged)} jobs{Color.RST}")
return 0
def cmd_infra(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show infrastructure status: Gitea version, runners, workflows."""
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
print(f"{Color.BLD}Infrastructure Status{Color.RST}\n")
print(f"{Color.DIM}Instance:{Color.RST} {api.config.url}")
# Gitea version
version = api.get_version()
if version:
print(f"{Color.DIM}Version:{Color.RST} Gitea {version.get('version', 'unknown')}")
print()
# Try to get runners at different levels
runners_found = False
# Repository runners
if owner and repo:
print(f"{Color.BLD}Repository: {owner}/{repo}{Color.RST}")
runners = api.get_runners(owner, repo)
if runners:
runners_found = True
print(f"\n{Color.BLD}Repository Runners:{Color.RST}")
for r in runners:
status = "online" if r.get("status") == "online" else "offline"
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
labels = ", ".join(r.get("labels", [])[:3]) or "no labels"
print(f" {color}{Color.RST} {name} ({labels})")
# Workflows
workflows = api.get_workflows(owner, repo)
if workflows:
print(f"\n{Color.BLD}Workflows:{Color.RST}")
for wf in workflows:
name = wf.get("name", "")
path = wf.get("path", "").split("/")[-1]
state = wf.get("state", "")
if state == "active":
print(f" {Color.GRN}{Color.RST} {path} - {name}")
else:
print(f" {Color.DIM}{Color.RST} {path} - {name} ({state})")
# Recent runs summary
runs = api.get_runs(owner, repo, limit=10)
if runs:
success = sum(1 for r in runs if r.get("conclusion") == "success")
failed = sum(1 for r in runs if r.get("conclusion") == "failure")
running = sum(1 for r in runs if r.get("status") in ("running", "waiting", "queued"))
print(f"\n{Color.BLD}Recent Runs (last 10):{Color.RST}")
print(f" {Color.GRN}{Color.RST} {success} success {Color.RED}{Color.RST} {failed} failed {Color.BLU}{Color.RST} {running} active")
# Organization runners (if owner specified)
if owner and not repo:
print(f"{Color.BLD}Organization: {owner}{Color.RST}")
runners = api.get_runners(owner)
if runners:
runners_found = True
print(f"\n{Color.BLD}Organization Runners:{Color.RST}")
for r in runners:
status = "online" if r.get("status") == "online" else "offline"
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
print(f" {color}{Color.RST} {name}")
# Global runners (admin)
if not owner:
runners = api.get_runners()
if runners:
runners_found = True
print(f"{Color.BLD}Global Runners:{Color.RST}")
for r in runners:
status = r.get("status", "unknown")
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
labels = ", ".join(r.get("labels", [])[:3]) or "no labels"
busy = r.get("busy", False)
busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else ""
print(f" {color}{Color.RST} {name} ({labels}){busy_str}")
elif not runners_found:
print(f"{Color.DIM}No runners found (may require admin access){Color.RST}")
# Pending jobs
if getattr(args, "jobs", False):
jobs = api.get_runner_jobs(owner or "", repo or "")
if jobs:
print(f"\n{Color.BLD}Pending/Running Jobs:{Color.RST}")
for job in jobs[:10]:
status = job.get("status", "")
name = job.get("name", "")
repo_name = job.get("repository", {}).get("full_name", "")
print(f" {Color.BLU}{Color.RST} {name} ({repo_name})")
return 0
def cmd_runners(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Manage runners - list, delete, info, jobs."""
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
action = getattr(args, "action", "list")
runner_id = getattr(args, "runner_id", None)
# Determine scope string
if owner and repo:
scope = f"repository {owner}/{repo}"
elif owner:
scope = f"organization {owner}"
else:
scope = "user-level"
# Handle different actions
if action == "delete":
if not runner_id:
print(f"{Color.RED}Runner ID required: gitea-ci runners delete <id>{Color.RST}")
return 1
# Confirm unless --force
if not getattr(args, "force", False):
confirm = input(f"Delete runner #{runner_id}? [y/N]: ").strip().lower()
if confirm != "y":
print(f"{Color.DIM}Cancelled{Color.RST}")
return 0
print(f"{Color.BLD}Deleting runner #{runner_id}...{Color.RST}")
if api.delete_runner(runner_id, owner or "", repo or ""):
print(f"{Color.GRN}✓ Runner deleted{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to delete runner{Color.RST}")
print(f"{Color.DIM}(API may not be available in this Gitea version){Color.RST}")
return 1
elif action == "info":
if not runner_id:
print(f"{Color.RED}Runner ID required: gitea-ci runners info <id>{Color.RST}")
return 1
runner = api.get_runner(runner_id, owner or "", repo or "")
if not runner:
print(f"{Color.RED}Runner #{runner_id} not found{Color.RST}")
return 1
if getattr(args, "json", False):
print(json.dumps(runner, indent=2))
return 0
print(f"{Color.BLD}Runner #{runner_id}{Color.RST}")
print("" * 35)
print(f" {Color.BLD}Name:{Color.RST} {runner.get('name', 'unnamed')}")
status = runner.get("status", "unknown")
status_color = Color.GRN if status == "online" else Color.RED
print(f" {Color.BLD}Status:{Color.RST} {status_color}{status}{Color.RST}")
print(f" {Color.BLD}Busy:{Color.RST} {'Yes' if runner.get('busy') else 'No'}")
labels = runner.get("labels", [])
if labels:
print(f" {Color.BLD}Labels:{Color.RST} {', '.join(labels)}")
if runner.get("version"):
print(f" {Color.BLD}Version:{Color.RST} {runner.get('version')}")
return 0
elif action == "jobs":
jobs = api.get_runner_jobs(owner or "", repo or "")
if getattr(args, "json", False):
print(json.dumps({"scope": scope, "jobs": jobs}, indent=2))
return 0
print(f"{Color.BLD}Runner Jobs ({scope}):{Color.RST}\n")
if not jobs:
print(f"{Color.DIM}No running/pending jobs{Color.RST}")
return 0
for job in jobs:
job_id = job.get("id", 0)
name = job.get("name", "unnamed")
status = job.get("status", "unknown")
runs_on = job.get("runs_on", [])
color, symbol = STATUS_STYLE.get(status, (Color.DIM, "?"))
labels_str = f" [{', '.join(runs_on)}]" if runs_on else ""
print(f" {color}{symbol}{Color.RST} #{job_id} {name}{labels_str}")
return 0
else: # list (default)
if owner and repo:
runners = api.get_runners(owner, repo)
elif owner:
runners = api.get_runners(owner)
else:
runners = api.get_runners()
if getattr(args, "json", False):
print(json.dumps({"scope": scope, "runners": runners}, indent=2))
return 0
print(f"{Color.BLD}Runners ({scope}):{Color.RST}\n")
if not runners:
print(f"{Color.DIM}No runners found{Color.RST}")
print(f"{Color.DIM}(May require authentication or admin access){Color.RST}")
return 0
online = 0
offline = 0
for r in runners:
rid = r.get("id", 0)
name = r.get("name", "unnamed")
status = r.get("status", "unknown")
busy = r.get("busy", False)
labels = r.get("labels", [])
if status == "online":
online += 1
color = Color.GRN
symbol = ""
else:
offline += 1
color = Color.RED
symbol = ""
busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else ""
labels_str = f" ({', '.join(labels[:4])})" if labels else ""
print(f" {color}{symbol}{Color.RST} #{rid} {name}{labels_str}{busy_str}")
print(f"\n{Color.DIM}Total: {online} online, {offline} offline{Color.RST}")
return 0
def cmd_register_token(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Get runner registration token."""
# If --user flag is set, force user-level token
if getattr(args, "user", False):
owner = ""
repo = ""
else:
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
# Determine scope
if owner and repo:
scope = f"repository {owner}/{repo}"
elif owner:
scope = f"organization {owner}"
else:
scope = "user"
print(f"{Color.BLD}Getting registration token ({scope})...{Color.RST}")
token = api.get_registration_token(owner or "", repo or "")
if token:
print(f"\n{Color.GRN}Registration Token:{Color.RST}\n")
print(f" {token}")
print(f"\n{Color.DIM}Usage:{Color.RST}")
print(f" act_runner register --instance {api.config.url} --token {token}")
print()
return 0
else:
print(f"{Color.RED}Failed to get registration token{Color.RST}")
print(f"{Color.DIM}Make sure your token has the required scopes{Color.RST}")
return 1
def cmd_delete(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Delete a workflow run."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
if not run_id:
print(f"{Color.RED}Run ID required (-R/--run-id){Color.RST}")
return 1
# Confirm unless --force
if not getattr(args, "force", False):
confirm = input(f"Delete run #{run_id}? [y/N]: ").strip().lower()
if confirm != "y":
print(f"{Color.DIM}Cancelled{Color.RST}")
return 0
print(f"{Color.BLD}Deleting run #{run_id}...{Color.RST}")
if api.delete_run(owner, repo, run_id):
print(f"{Color.GRN}✓ Run deleted{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to delete run{Color.RST}")
return 1
def cmd_workflows(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List workflows in a repository with details."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
workflows = api.get_workflows(owner, repo)
if getattr(args, "json", False):
print(json.dumps({"repository": f"{owner}/{repo}", "workflows": workflows}, indent=2))
return 0
print(f"{Color.BLD}Workflows for {owner}/{repo}:{Color.RST}\n")
if not workflows:
print(f"{Color.DIM}No workflows found{Color.RST}")
return 0
for wf in workflows:
name = wf.get("name", "unnamed")
path = wf.get("path", "")
state = wf.get("state", "unknown")
filename = path.split("/")[-1] if path else ""
if state == "active":
color = Color.GRN
symbol = ""
elif state == "disabled":
color = Color.YLW
symbol = ""
else:
color = Color.DIM
symbol = "?"
print(f" {color}{symbol}{Color.RST} {filename}")
print(f" {Color.DIM}Name:{Color.RST} {name}")
print(f" {Color.DIM}Path:{Color.RST} {path}")
# Get workflow triggers if verbose
if getattr(args, "verbose", False):
content = api.get_workflow_contents(owner, repo, filename)
if content:
# Parse YAML for triggers
triggers = []
in_on = False
for line in content.split("\n"):
stripped = line.strip()
if stripped.startswith("on:"):
in_on = True
# Single-line on: push
if ":" in stripped[3:]:
triggers.append(stripped[3:].strip())
elif in_on:
if stripped and not stripped.startswith("#"):
if line[0] not in (" ", "\t"):
break # End of on: block
if ":" in stripped or stripped.endswith(":"):
triggers.append(stripped.rstrip(":"))
if triggers:
print(f" {Color.DIM}Triggers:{Color.RST} {', '.join(triggers[:5])}")
print()
return 0
def cmd_validate(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Validate local workflow files."""
path_arg = getattr(args, "path", None)
check_runners = getattr(args, "check_runners", False)
strict = getattr(args, "strict", False)
# Find workflow files
if path_arg:
path = Path(path_arg)
if path.is_file():
workflows = [path]
elif path.is_dir():
workflows = list(path.glob("*.yml")) + list(path.glob("*.yaml"))
else:
print(f"{Color.RED}Path not found: {path_arg}{Color.RST}")
return 1
else:
# Look for workflow directories
for workflow_dir in [".gitea/workflows", ".github/workflows"]:
p = Path(workflow_dir)
if p.exists():
workflows = list(p.glob("*.yml")) + list(p.glob("*.yaml"))
break
else:
print(f"{Color.RED}No workflow directory found{Color.RST}")
print(f"{Color.DIM}Expected: .gitea/workflows/ or .github/workflows/{Color.RST}")
return 1
if not workflows:
print(f"{Color.YLW}No workflow files found{Color.RST}")
return 0
# Get available runner labels if checking runners
available_labels: set = set()
if check_runners:
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
runners = api.get_runners(owner or "", repo or "")
for r in runners:
for label in r.get("labels", []):
available_labels.add(label)
total_errors = 0
total_warnings = 0
for wf_path in sorted(workflows):
content = wf_path.read_text()
errors, warnings = _validate_workflow(content, wf_path.name, available_labels)
if errors or warnings:
print(f"{Color.BLD}{wf_path}{Color.RST}")
for level, line, msg in errors:
print(f" {Color.RED}error:{Color.RST} {msg}" + (f" (line {line})" if line else ""))
total_errors += 1
for level, line, msg in warnings:
print(f" {Color.YLW}warning:{Color.RST} {msg}" + (f" (line {line})" if line else ""))
total_warnings += 1
print()
else:
print(f"{Color.GRN}{Color.RST} {wf_path}")
# Summary
print()
if total_errors > 0:
print(f"{Color.RED}{total_errors} error(s){Color.RST}, {total_warnings} warning(s)")
return 1
elif total_warnings > 0 and strict:
print(f"{Color.YLW}{total_warnings} warning(s) (strict mode){Color.RST}")
return 1
elif total_warnings > 0:
print(f"{Color.GRN}OK{Color.RST} with {total_warnings} warning(s)")
return 0
else:
print(f"{Color.GRN}All {len(workflows)} workflow(s) valid{Color.RST}")
return 0
def _validate_workflow(content: str, filename: str, available_labels: set) -> tuple:
"""Validate workflow content without external YAML parser.
Returns (errors, warnings) where each is a list of (level, line, message).
"""
errors: list[tuple] = []
warnings: list[tuple] = []
lines = content.split("\n")
# Check for required top-level keys
has_name = False
has_on = False
has_jobs = False
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
if re.match(r"^name\s*:", line):
has_name = True
if re.match(r"^on\s*:", line):
has_on = True
if re.match(r"^jobs\s*:", line):
has_jobs = True
if not has_name:
warnings.append(("warning", 0, "Missing 'name:' field (recommended)"))
if not has_on:
errors.append(("error", 0, "Missing 'on:' trigger definition"))
if not has_jobs:
errors.append(("error", 0, "Missing 'jobs:' section"))
# Check runs-on labels
runs_on_matches = re.finditer(r"runs-on\s*:\s*(.+)", content)
for match in runs_on_matches:
label = match.group(1).strip().strip("'\"")
# Find line number
pos = match.start()
line_num = content[:pos].count("\n") + 1
# Check if label is available (if we have runner info)
if available_labels and label not in available_labels:
warnings.append(("warning", line_num, f"runs-on label '{label}' not found in available runners"))
# Check for common issues
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Check for tabs (YAML doesn't like tabs)
if "\t" in line and not stripped.startswith("#"):
errors.append(("error", i, "Tab character found (use spaces for indentation)"))
# Check for trailing colons without values (common mistake)
if re.match(r"^\s+-\s*$", line):
errors.append(("error", i, "Empty list item"))
# Check for uses without version
uses_match = re.match(r".*uses\s*:\s*([^@\s]+)$", stripped)
if uses_match and not stripped.startswith("#"):
action = uses_match.group(1)
if "/" in action and "@" not in action:
warnings.append(("warning", i, f"Action '{action}' should specify version (e.g., @v1)"))
return errors, warnings
def cmd_stats(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show comprehensive workflow statistics with visual graphs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.BLD}Error:{Color.RST} Repository required")
return 1
# Fetch more runs for better statistics
limit = getattr(args, "limit", 50) or 50
runs = api.get_runs(owner, repo, limit=limit)
if not runs:
print(f"{Color.DIM}No workflow runs found{Color.RST}")
return 0
# JSON output
if getattr(args, "json", False):
return _output_stats_json(runs, owner, repo)
print(f"{Color.BLD}Workflow Statistics: {owner}/{repo}{Color.RST}")
print("" * 60)
# Calculate statistics
total = len(runs)
success = [r for r in runs if r.get("conclusion") == "success"]
failed = [r for r in runs if r.get("conclusion") == "failure"]
cancelled = [r for r in runs if r.get("conclusion") == "cancelled"]
running = [r for r in runs if r.get("status") in ("running", "waiting")]
completed = total - len(running)
# Summary section
print(f"{Color.BLD}Summary{Color.RST} {Color.DIM}(last {total} runs){Color.RST}")
print("" * 40)
success_rate = (len(success) / completed * 100) if completed > 0 else 0
print(f" {'Total:':<12} {total}")
print(f" {'Completed:':<12} {completed}")
print(f" {'✓ Success:':<12} {len(success)}")
print(f" {'✗ Failed:':<12} {len(failed)}")
print(f" {'○ Cancelled:':<12} {len(cancelled)}")
print(f" {'● Running:':<12} {len(running)}")
print()
# Success rate bar
print(f"{Color.BLD}Success Rate{Color.RST}")
print("" * 40)
_print_rate_bar("Overall", success_rate, 30)
print()
# Per-workflow breakdown
workflows: dict[str, dict] = {}
for run in runs:
wf = run.get("workflow") or run.get("name") or "unknown"
if wf not in workflows:
workflows[wf] = {"total": 0, "success": 0, "failed": 0, "durations": []}
workflows[wf]["total"] += 1
if run.get("conclusion") == "success":
workflows[wf]["success"] += 1
elif run.get("conclusion") == "failure":
workflows[wf]["failed"] += 1
# Track duration
duration = _parse_duration_seconds(run)
if duration > 0:
workflows[wf]["durations"].append(duration)
if len(workflows) > 1:
print(f"{Color.BLD}Per-Workflow Breakdown{Color.RST}")
print("" * 40)
for wf_name, stats in sorted(workflows.items()):
wf_total = stats["total"]
wf_success = stats["success"]
wf_rate = (wf_success / wf_total * 100) if wf_total > 0 else 0
# Truncate workflow name
display_name = wf_name[:20] if len(wf_name) > 20 else wf_name
_print_rate_bar(display_name, wf_rate, 20, show_count=f"{wf_success}/{wf_total}")
print()
# Duration statistics
all_durations = []
for run in runs:
duration = _parse_duration_seconds(run)
if duration > 0:
all_durations.append(duration)
if all_durations:
print(f"{Color.BLD}Duration Statistics{Color.RST}")
print("" * 40)
avg_duration = sum(all_durations) / len(all_durations)
min_duration = min(all_durations)
max_duration = max(all_durations)
print(f" {'Average:':<12} {_format_seconds(int(avg_duration))}")
print(f" {'Fastest:':<12} {_format_seconds(min_duration)}")
print(f" {'Slowest:':<12} {_format_seconds(max_duration)}")
print()
# Recent trend (last 10 runs)
recent = runs[:min(10, len(runs))]
if recent:
print(f"{Color.BLD}Recent Trend{Color.RST} {Color.DIM}(last {len(recent)} runs){Color.RST}")
print("" * 40)
trend_line = " "
for run in reversed(recent): # Oldest to newest
conclusion = run.get("conclusion", "")
if conclusion == "success":
trend_line += "" # Full block for success
elif conclusion == "failure":
trend_line += "" # Low block for failure (bar chart effect)
elif run.get("status") in ("running", "waiting"):
trend_line += "" # Medium shade for running
else:
trend_line += "" # Light shade for other
print(trend_line)
print(f" {Color.DIM}█=pass ▁=fail ▒=running ░=other{Color.RST}")
print(f" {Color.DIM}← older newer →{Color.RST}")
print()
# Job-level statistics (if detailed flag)
if getattr(args, "detailed", False):
_print_job_stats(api, runs[:20], owner, repo)
return 0
def _print_rate_bar(label: str, rate: float, width: int = 20, show_count: str = "") -> None:
"""Print a labeled progress bar for rates. No colors per STYLING.md."""
filled = int(rate / 100 * width)
empty = width - filled
# Use block characters only - no colors
bar = f"{'' * filled}{Color.DIM}{'' * empty}{Color.RST}"
count_str = f" {Color.DIM}{show_count}{Color.RST}" if show_count else ""
print(f" {label:<20} {bar} {rate:5.1f}%{count_str}")
def _parse_duration_seconds(run: dict) -> int:
"""Parse run duration to seconds."""
# Try direct duration field
duration_str = run.get("duration", "")
if duration_str:
# Parse formats like "1m30s", "45s", "2m"
total = 0
import re as re_mod
minutes = re_mod.search(r"(\d+)m", duration_str)
seconds = re_mod.search(r"(\d+)s", duration_str)
if minutes:
total += int(minutes.group(1)) * 60
if seconds:
total += int(seconds.group(1))
return total
# Calculate from timestamps
started = run.get("run_started_at", "")
ended = run.get("updated_at", "")
if started and ended and run.get("status") == "completed":
try:
from datetime import datetime
start_dt = datetime.fromisoformat(started.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00"))
return int((end_dt - start_dt).total_seconds())
except (ValueError, TypeError):
pass
return 0
def _format_seconds(seconds: int) -> str:
"""Format seconds to human-readable duration."""
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
secs = seconds % 60
if minutes < 60:
return f"{minutes}m{secs}s" if secs else f"{minutes}m"
hours = minutes // 60
mins = minutes % 60
return f"{hours}h{mins}m"
def _print_job_stats(api: GiteaAPI, runs: list, owner: str, repo: str) -> None:
"""Print job-level statistics. No colors per STYLING.md."""
print(f"{Color.BLD}Job Statistics{Color.RST} {Color.DIM}(from {len(runs)} recent runs){Color.RST}")
print("" * 40)
job_stats: dict[str, dict] = {}
for run in runs:
if run.get("status") != "completed":
continue
run_id = run.get("id")
jobs = api.get_jobs(owner, repo, run_id)
for job in jobs:
name = job.get("name", "unknown")
if name not in job_stats:
job_stats[name] = {"success": 0, "failed": 0, "durations": []}
if job.get("conclusion") == "success":
job_stats[name]["success"] += 1
elif job.get("conclusion") == "failure":
job_stats[name]["failed"] += 1
# Parse job duration
started = job.get("started_at", "")
ended = job.get("completed_at", "")
if started and ended:
try:
from datetime import datetime
start_dt = datetime.fromisoformat(started.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00"))
duration = int((end_dt - start_dt).total_seconds())
# Sanity check: duration should be reasonable (< 24 hours)
if 0 < duration < 86400:
job_stats[name]["durations"].append(duration)
except (ValueError, TypeError):
pass
if not job_stats:
print(f" {Color.DIM}No job data available{Color.RST}")
return
# Print job table - use symbols, no colors
print(f" {'Job':<30} {'✓ Pass':<8} {'✗ Fail':<8} {'Avg':<8}")
print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*8}")
for job_name, stats in sorted(job_stats.items()):
total = stats["success"] + stats["failed"]
if total == 0:
continue
name_display = job_name[:30] if len(job_name) > 30 else job_name
avg_dur = ""
if stats["durations"]:
avg = sum(stats["durations"]) / len(stats["durations"])
avg_dur = _format_seconds(int(avg))
# No colors - use plain text
fail_str = str(stats["failed"]) if stats["failed"] > 0 else f"{Color.DIM}0{Color.RST}"
print(f" {name_display:<30} {stats['success']:<8} {fail_str:<8} {avg_dur:<8}")
print()
def _output_stats_json(runs: list, owner: str, repo: str) -> int:
"""Output statistics as JSON."""
total = len(runs)
success = len([r for r in runs if r.get("conclusion") == "success"])
failed = len([r for r in runs if r.get("conclusion") == "failure"])
cancelled = len([r for r in runs if r.get("conclusion") == "cancelled"])
running = len([r for r in runs if r.get("status") in ("running", "waiting")])
completed = total - running
# Per-workflow stats
workflows: dict[str, dict] = {}
for run in runs:
wf = run.get("workflow") or "unknown"
if wf not in workflows:
workflows[wf] = {"total": 0, "success": 0, "failed": 0}
workflows[wf]["total"] += 1
if run.get("conclusion") == "success":
workflows[wf]["success"] += 1
elif run.get("conclusion") == "failure":
workflows[wf]["failed"] += 1
output = {
"repository": f"{owner}/{repo}",
"total_runs": total,
"completed": completed,
"success": success,
"failed": failed,
"cancelled": cancelled,
"running": running,
"success_rate": round(success / completed * 100, 1) if completed > 0 else 0,
"workflows": workflows,
}
print(json.dumps(output, indent=2))
return 0
def send_notification(title: str, message: str, urgency: str = "normal") -> None:
"""Send desktop notification via notify-send."""
if shutil.which("notify-send"):
try:
subprocess.run(
["notify-send", f"--urgency={urgency}", title, message],
capture_output=True,
timeout=5
)
except (subprocess.SubprocessError, OSError):
pass
def main() -> int:
"""Main entry point."""
# Quick check for direct run URL or instance URL as first arg
if len(sys.argv) > 1 and not sys.argv[1].startswith("-"):
arg = sys.argv[1]
# Check if it looks like a URL or hostname (not a command)
commands = ("list", "ls", "l", "status", "s", "logs", "log", "watch", "w", "config",
"trigger", "t", "rerun", "cancel", "artifacts", "art", "pr")
if "." in arg and arg not in commands:
# Check for direct run URL: git.mymx.me/user/repo/actions/runs/123
run_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)/actions/runs/(\d+)", arg)
if run_match:
host, owner, repo, run_id = run_match.groups()
url = f"https://{host}"
os.environ["GITEA_URL"] = url
# Rewrite args to: status -o owner -r repo -R run_id
sys.argv = [sys.argv[0], "status", "-o", owner, "-r", repo, "-R", run_id]
else:
# Check for repo URL: git.mymx.me/user/repo/actions
repo_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)(?:/actions)?$", arg)
if repo_match:
host, owner, repo = repo_match.groups()
url = f"https://{host}"
os.environ["GITEA_URL"] = url
sys.argv = [sys.argv[0], "list", f"{owner}/{repo}"]
else:
# Just an instance URL
url = arg if arg.startswith("http") else f"https://{arg}"
os.environ["GITEA_URL"] = url
sys.argv.pop(1) # Remove URL from args
parser = argparse.ArgumentParser(
description="Monitor and manage Gitea Actions workflows",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Commands:
list List recent workflow runs (default: discover all)
stats Show workflow statistics with visual graphs
status Show detailed run status with jobs
logs Show job logs
watch Watch running workflows in real-time
trigger Trigger a workflow manually (workflow_dispatch)
rerun Re-run a workflow or failed jobs
cancel Cancel a running workflow
delete Delete a workflow run
artifacts List or download build artifacts
pr Show CI status for a pull request
compare Compare two workflow runs (diff jobs)
runners List runners and their status
register-token Get runner registration token
workflows List workflows in a repository
infra Show infrastructure status
config Configure token and defaults
Environment:
GITEA_URL Gitea instance URL
GITEA_TOKEN API token for authentication (required for write operations)
Examples:
gitea-ci git.mymx.me Discover all workflows on instance
gitea-ci git.mymx.me/user/repo List runs for specific repo
gitea-ci list user/repo --failed Show only failed runs
gitea-ci list user/repo --json Output as JSON
gitea-ci stats user/repo Show statistics with graphs
gitea-ci stats user/repo --detailed Include per-job breakdown
gitea-ci status user/repo Show latest run status
gitea-ci status user/repo 123 Show specific run details
gitea-ci status user/repo 123 --json Output run details as JSON
gitea-ci watch user/repo Watch latest run
gitea-ci watch user/repo 123 -t 300 Watch specific run with timeout
gitea-ci logs user/repo 123 -j 456 Show job logs
gitea-ci trigger user/repo -w lint.yml Trigger workflow manually
gitea-ci rerun user/repo --failed Re-run only failed jobs
gitea-ci cancel user/repo Cancel latest running workflow
gitea-ci artifacts user/repo -d 42 Download artifact #42
gitea-ci pr user/repo -p 123 Show CI status for PR #123
gitea-ci compare user/repo 175 174 Compare two runs
gitea-ci delete user/repo -R 123 Delete workflow run #123
gitea-ci runners List runners (user-level)
gitea-ci runners -o org List organization runners
gitea-ci runners info 5 Show runner #5 details
gitea-ci runners delete 5 -f Delete runner #5 (no confirmation)
gitea-ci runners jobs Show running/pending jobs
gitea-ci register-token Get user-level registration token
gitea-ci register-token -o org -r repo Get repo-level registration token
gitea-ci workflows user/repo -v List workflows with triggers
gitea-ci config --test Test API connectivity
gitea-ci config token Show token info
gitea-ci config token check Check token permissions
gitea-ci config token set <token> Set API token
gitea-ci validate Validate local workflow files
gitea-ci validate --check-runners Cross-reference runs-on labels
gitea-ci validate --strict Treat warnings as errors
""",
)
parser.add_argument("-u", "--url", help="Gitea URL")
subparsers = parser.add_subparsers(dest="command", metavar="command")
# Common args for repo-based commands
def add_repo_args(p):
p.add_argument("-o", "--owner", help="Repository owner")
p.add_argument("-r", "--repo", help="Repository name")
p.add_argument("repo_path", nargs="?", help="Repository as owner/repo")
# list
list_p = subparsers.add_parser("list", aliases=["ls", "l"], help="List workflow runs")
add_repo_args(list_p)
list_p.add_argument("-n", "--limit", type=int, default=10, help="Number of runs")
list_p.add_argument("--failed", dest="status_filter", action="store_const", const="failed", help="Show only failed runs")
list_p.add_argument("--success", dest="status_filter", action="store_const", const="success", help="Show only successful runs")
list_p.add_argument("--running", dest="status_filter", action="store_const", const="running", help="Show only running/waiting")
list_p.add_argument("-b", "--branch", help="Filter by branch name")
list_p.add_argument("-w", "--workflow", help="Filter by workflow file")
list_p.add_argument("--json", action="store_true", help="Output as JSON")
list_p.add_argument("--compact", action="store_true", help="Compact single-line output")
# stats
stats_p = subparsers.add_parser("stats", help="Show workflow statistics with graphs")
add_repo_args(stats_p)
stats_p.add_argument("-n", "--limit", type=int, default=50, help="Number of runs to analyze")
stats_p.add_argument("--detailed", action="store_true", help="Include per-job statistics")
stats_p.add_argument("--json", action="store_true", help="Output as JSON")
# status (supports repo_path and/or run_id)
status_p = subparsers.add_parser("status", aliases=["s"], help="Show run status")
status_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
status_p.add_argument("-o", "--owner", help="Repository owner")
status_p.add_argument("-r", "--repo", help="Repository name")
status_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID")
status_p.add_argument("--json", action="store_true", help="Output as JSON")
# logs (supports repo_path and/or run_id)
logs_p = subparsers.add_parser("logs", aliases=["log"], help="Show job logs")
logs_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
logs_p.add_argument("-o", "--owner", help="Repository owner")
logs_p.add_argument("-r", "--repo", help="Repository name")
logs_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID")
logs_p.add_argument("-j", "--job", dest="job_id", type=int, help="Job ID")
# watch (supports repo_path and/or run_id)
watch_p = subparsers.add_parser("watch", aliases=["w"], help="Watch runs")
watch_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
watch_p.add_argument("-o", "--owner", help="Repository owner")
watch_p.add_argument("-r", "--repo", help="Repository name")
watch_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID to watch")
watch_p.add_argument("-i", "--interval", type=int, default=5, help="Refresh interval (seconds)")
watch_p.add_argument("-t", "--timeout", type=int, default=0, help="Auto-exit after N seconds (0=disabled)")
watch_p.add_argument("--no-clear", action="store_true", help="Don't clear screen")
watch_p.add_argument("--notify", action="store_true", help="Desktop notification on completion")
# config
config_p = subparsers.add_parser("config", help="Configure gitea-ci")
config_p.add_argument("action", nargs="?", choices=["token"], help="Subcommand")
config_p.add_argument("token_action", nargs="?", choices=["check", "set"], help="Token action")
config_p.add_argument("token_value", nargs="?", help="Token value (for 'set')")
config_p.add_argument("--show", action="store_true", help="Show current config")
config_p.add_argument("--test", action="store_true", help="Test API connectivity")
# repo
repo_p = subparsers.add_parser("repo", help="Manage repositories")
repo_p.add_argument("action", nargs="?", choices=["list", "create", "exists", "check"], default="list",
help="Action: list, create, exists, check")
repo_p.add_argument("name", nargs="?", help="Repository name (for create/exists)")
repo_p.add_argument("-d", "--description", help="Repository description (for create)")
repo_p.add_argument("-p", "--private", action="store_true", help="Make repository private")
add_repo_args(repo_p)
# trigger
trigger_p = subparsers.add_parser("trigger", aliases=["t"], help="Trigger workflow")
add_repo_args(trigger_p)
trigger_p.add_argument("-w", "--workflow", help="Workflow file (e.g., lint.yml)")
trigger_p.add_argument("-b", "--ref", help="Branch/tag/SHA to run on (default: master)")
trigger_p.add_argument("-i", "--input", action="append", help="Input as key=value (repeatable)")
# rerun
rerun_p = subparsers.add_parser("rerun", help="Re-run workflow/jobs")
add_repo_args(rerun_p)
rerun_p.add_argument("-R", "--run-id", type=int, help="Run ID to re-run")
rerun_p.add_argument("-j", "--job-id", type=int, help="Specific job ID to re-run")
rerun_p.add_argument("-f", "--failed-only", action="store_true", help="Re-run only failed jobs")
# cancel
cancel_p = subparsers.add_parser("cancel", help="Cancel running workflow")
add_repo_args(cancel_p)
cancel_p.add_argument("-R", "--run-id", type=int, help="Run ID to cancel")
# delete
delete_p = subparsers.add_parser("delete", aliases=["rm"], help="Delete a workflow run")
add_repo_args(delete_p)
delete_p.add_argument("-R", "--run-id", type=int, required=True, help="Run ID to delete")
delete_p.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# artifacts
artifacts_p = subparsers.add_parser("artifacts", aliases=["art"], help="List/download artifacts")
add_repo_args(artifacts_p)
artifacts_p.add_argument("-R", "--run-id", type=int, help="Filter by run ID")
artifacts_p.add_argument("-d", "--download", type=int, metavar="ID", help="Download artifact by ID")
artifacts_p.add_argument("-O", "--output", help="Output filename for download")
# pr
pr_p = subparsers.add_parser("pr", help="Show PR CI status")
add_repo_args(pr_p)
pr_p.add_argument("-p", "--pr-number", type=int, help="Pull request number")
# compare
compare_p = subparsers.add_parser("compare", aliases=["diff"], help="Compare two runs")
add_repo_args(compare_p)
compare_p.add_argument("run1", type=int, help="First run ID")
compare_p.add_argument("run2", type=int, nargs="?", help="Second run ID (default: previous run)")
compare_p.add_argument("--json", action="store_true", help="Output as JSON")
# infra
infra_p = subparsers.add_parser("infra", help="Show infrastructure status")
add_repo_args(infra_p)
infra_p.add_argument("--jobs", action="store_true", help="Show pending/running jobs")
# runners
runners_p = subparsers.add_parser("runners", help="Manage runners")
runners_p.add_argument("action", nargs="?", default="list",
choices=["list", "delete", "info", "jobs"],
help="Action: list, delete, info, jobs")
runners_p.add_argument("runner_id", nargs="?", type=int, help="Runner ID (for delete/info)")
add_repo_args(runners_p)
runners_p.add_argument("--force", "-f", action="store_true", help="Skip confirmation for delete")
runners_p.add_argument("--json", action="store_true", help="Output as JSON")
# register-token
regtoken_p = subparsers.add_parser("register-token", aliases=["regtoken"], help="Get runner registration token")
add_repo_args(regtoken_p)
regtoken_p.add_argument("--user", action="store_true", help="Get user-level token (ignores repo defaults)")
# workflows
workflows_p = subparsers.add_parser("workflows", aliases=["wf"], help="List workflows")
add_repo_args(workflows_p)
workflows_p.add_argument("-v", "--verbose", action="store_true", help="Show workflow triggers and details")
# validate
validate_p = subparsers.add_parser("validate", aliases=["lint"], help="Validate local workflow files")
validate_p.add_argument("path", nargs="?", help="Path to workflow dir or file")
add_repo_args(validate_p)
validate_p.add_argument("--check-runners", action="store_true", help="Cross-reference runs-on with available runners")
validate_p.add_argument("--strict", action="store_true", help="Treat warnings as errors")
args = parser.parse_args()
# Load config
config = Config.load()
if args.url:
config.url = args.url
api = GiteaAPI(config)
# Set defaults for optional args
if not hasattr(args, "owner"):
args.owner = None
if not hasattr(args, "repo"):
args.repo = None
if not hasattr(args, "limit"):
args.limit = 10
# Parse repo_path (owner/repo format)
if hasattr(args, "repo_path") and args.repo_path and "/" in args.repo_path:
parts = args.repo_path.split("/", 1)
if not args.owner:
args.owner = parts[0]
if not args.repo:
args.repo = parts[1]
# Parse positional args for status/logs/watch: [owner/repo] [run_id]
if hasattr(args, "positional") and args.positional:
for pos in args.positional:
if "/" in pos:
# Looks like owner/repo
parts = pos.split("/", 1)
if not args.owner:
args.owner = parts[0]
if not args.repo:
args.repo = parts[1]
elif pos.isdigit():
# Looks like run_id
if not args.run_id:
args.run_id = int(pos)
# Dispatch commands
if args.command in (None, "list", "ls", "l"):
return cmd_list(api, args)
elif args.command == "stats":
return cmd_stats(api, args)
elif args.command in ("status", "s"):
return cmd_status(api, args)
elif args.command in ("logs", "log"):
return cmd_logs(api, args)
elif args.command in ("watch", "w"):
return cmd_watch(api, args)
elif args.command == "config":
return cmd_config(api, args)
elif args.command == "repo":
return cmd_repo(api, args)
elif args.command in ("trigger", "t"):
return cmd_trigger(api, args)
elif args.command == "rerun":
return cmd_rerun(api, args)
elif args.command == "cancel":
return cmd_cancel(api, args)
elif args.command in ("delete", "rm"):
return cmd_delete(api, args)
elif args.command in ("artifacts", "art"):
return cmd_artifacts(api, args)
elif args.command == "pr":
return cmd_pr(api, args)
elif args.command in ("compare", "diff"):
return cmd_compare(api, args)
elif args.command == "infra":
return cmd_infra(api, args)
elif args.command == "runners":
return cmd_runners(api, args)
elif args.command in ("register-token", "regtoken"):
return cmd_register_token(api, args)
elif args.command in ("workflows", "wf"):
return cmd_workflows(api, args)
elif args.command in ("validate", "lint"):
return cmd_validate(api, args)
return 0
if __name__ == "__main__":
sys.exit(main())