commit e87d327a8979307b6c243503362a22c6f0cf8dc2 Author: Username Date: Sun Jan 18 17:39:57 2026 +0100 initial commit: gitea-ci workflow manager diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/gitea-ci.py b/gitea-ci.py new file mode 100755 index 0000000..be0eaad --- /dev/null +++ b/gitea-ci.py @@ -0,0 +1,3068 @@ +#!/usr/bin/env python3 +"""Gitea CI - Monitor and manage Gitea Actions workflows. + +Features: +- List recent workflow runs across repositories +- Show detailed job status with live updates +- Stream build logs in real-time +- Trigger workflows manually (workflow_dispatch) +- Re-run failed jobs, cancel running workflows +- List and download build artifacts +- View PR check status, compare workflow runs +- Desktop notifications on completion +- JSON output mode for scripting +""" + +import argparse +import json +import os +import re +import shutil +import subprocess +import sys +import time +import urllib.request +import urllib.error +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + +# Add lib to path for style import +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "lib")) +from style import Color, Diff # noqa: E402 + +# Configuration +CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "gitea-ci" +CONFIG_FILE = CONFIG_DIR / "config.json" +DEFAULT_INSTANCE = os.environ.get("GITEA_URL", "https://git.mymx.me") + + +# Status colors and symbols +STATUS_STYLE = { + "success": (Color.GRN, "✓"), + "failure": (Color.RED, "✗"), + "cancelled": (Color.YLW, "⊘"), + "running": (Color.BLU, "●"), + "waiting": (Color.DIM, "○"), + "queued": (Color.DIM, "◌"), + "pending": (Color.DIM, "○"), + "skipped": (Color.DIM, "⊘"), +} + + +@dataclass +class Config: + """Gitea CI configuration.""" + + url: str = DEFAULT_INSTANCE + token: str = "" + default_owner: str = "" + default_repo: str = "" + + @classmethod + def load(cls) -> "Config": + """Load config from file or environment.""" + config = cls() + + # Environment overrides + config.url = os.environ.get("GITEA_URL", config.url) + config.token = os.environ.get("GITEA_TOKEN", "") + + # Load from file + if CONFIG_FILE.exists(): + try: + data = json.loads(CONFIG_FILE.read_text()) + config.url = data.get("url", config.url) + config.token = data.get("token", config.token) + config.default_owner = data.get("default_owner", "") + config.default_repo = data.get("default_repo", "") + except (json.JSONDecodeError, IOError): + pass + + return config + + def save(self) -> None: + """Save config to file.""" + CONFIG_DIR.mkdir(parents=True, exist_ok=True) + data = { + "url": self.url, + "token": self.token, + "default_owner": self.default_owner, + "default_repo": self.default_repo, + } + CONFIG_FILE.write_text(json.dumps(data, indent=2) + "\n") + CONFIG_FILE.chmod(0o600) + + +class GiteaAPI: + """Gitea API client with rate limiting support.""" + + def __init__(self, config: Config): + self.config = config + self.base_url = config.url.rstrip("/") + "/api/v1" + self._retry_delay = 1 # Initial retry delay in seconds + self._max_retry_delay = 60 # Max delay + self._rate_limited = False + + def _request(self, endpoint: str, method: str = "GET", retries: int = 3) -> dict | None: + """Make API request with rate limit handling.""" + url = f"{self.base_url}{endpoint}" + headers = {"Accept": "application/json"} + + if self.config.token: + headers["Authorization"] = f"token {self.config.token}" + + req = urllib.request.Request(url, headers=headers, method=method) + + for attempt in range(retries): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + # Reset retry delay on success + self._retry_delay = 1 + self._rate_limited = False + return json.loads(resp.read().decode()) + + except urllib.error.HTTPError as e: + if e.code == 401: + print(f"{Color.RED}Authentication required. Set GITEA_TOKEN or run: gitea-ci config{Color.RST}") + return None + elif e.code == 404: + return None + elif e.code == 429 or e.code == 403: + # Rate limited - exponential backoff + self._rate_limited = True + retry_after = e.headers.get("Retry-After") + if retry_after and retry_after.isdigit(): + delay = int(retry_after) + else: + delay = self._retry_delay + + if attempt < retries - 1: + print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr) + time.sleep(delay) + # Exponential backoff with max + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + else: + print(f"{Color.RED}Rate limited. Try again later.{Color.RST}") + return None + elif e.code >= 500: + # Server error - retry with backoff + if attempt < retries - 1: + delay = self._retry_delay + print(f"{Color.YLW}Server error {e.code}. Retrying in {delay}s...{Color.RST}", file=sys.stderr) + time.sleep(delay) + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + else: + print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") + return None + else: + print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") + return None + + except urllib.error.URLError as e: + if attempt < retries - 1: + delay = self._retry_delay + print(f"{Color.YLW}Connection error. Retrying in {delay}s...{Color.RST}", file=sys.stderr) + time.sleep(delay) + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + else: + print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") + return None + + return None + + def _request_text(self, endpoint: str, retries: int = 3) -> str | None: + """Make API request returning text with rate limit handling.""" + url = f"{self.base_url}{endpoint}" + headers = {} + + if self.config.token: + headers["Authorization"] = f"token {self.config.token}" + + req = urllib.request.Request(url, headers=headers) + + for attempt in range(retries): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + self._retry_delay = 1 + return resp.read().decode() + + except urllib.error.HTTPError as e: + if e.code == 429 or e.code == 403: + retry_after = e.headers.get("Retry-After") + delay = int(retry_after) if retry_after and retry_after.isdigit() else self._retry_delay + + if attempt < retries - 1: + print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr) + time.sleep(delay) + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + return None + + except urllib.error.URLError: + if attempt < retries - 1: + time.sleep(self._retry_delay) + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + return None + + return None + + def throttle(self) -> None: + """Add small delay between requests to avoid rate limiting.""" + if self._rate_limited: + time.sleep(0.5) + else: + time.sleep(0.1) # Small delay to be nice to the server + + def get_repos(self, limit: int = 20) -> list[Any]: + """Get repositories for current user.""" + data = self._request(f"/user/repos?limit={limit}") + return list(data) if data else [] + + def create_repo(self, name: str, description: str = "", private: bool = False, + auto_init: bool = True) -> dict | None: + """Create a new repository.""" + data = { + "name": name, + "description": description, + "private": private, + "auto_init": auto_init, + } + return self._post("/user/repos", data) + + def get_repo(self, owner: str, repo: str) -> dict | None: + """Get repository details.""" + return self._request(f"/repos/{owner}/{repo}") + + def repo_exists(self, owner: str, repo: str) -> bool: + """Check if a repository exists.""" + return self.get_repo(owner, repo) is not None + + def get_runs(self, owner: str, repo: str, limit: int = 10) -> list: + """Get workflow runs for a repository.""" + data = self._request(f"/repos/{owner}/{repo}/actions/runs?limit={limit}") + if data and "workflow_runs" in data: + return data["workflow_runs"] + return data if isinstance(data, list) else [] + + def get_run(self, owner: str, repo: str, run_id: int) -> dict | None: + """Get specific workflow run.""" + return self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}") + + def get_jobs(self, owner: str, repo: str, run_id: int) -> list: + """Get jobs for a workflow run.""" + data = self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs") + if data and "jobs" in data: + return data["jobs"] + return data if isinstance(data, list) else [] + + def get_job_logs(self, owner: str, repo: str, job_id: int) -> str | None: + """Get logs for a job.""" + return self._request_text(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") + + def trigger_workflow(self, owner: str, repo: str, workflow: str, ref: str = "master", + inputs: dict | None = None) -> bool: + """Trigger a workflow_dispatch event.""" + endpoint = f"/repos/{owner}/{repo}/actions/workflows/{workflow}/dispatches" + data: dict[str, Any] = {"ref": ref} + if inputs: + data["inputs"] = inputs + return self._post(endpoint, data) is not None + + def rerun_job(self, owner: str, repo: str, job_id: int) -> bool: + """Re-run a specific job.""" + return self._post(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/rerun", {}) is not None + + def rerun_workflow(self, owner: str, repo: str, run_id: int) -> bool: + """Re-run all jobs in a workflow run.""" + return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun", {}) is not None + + def rerun_failed_jobs(self, owner: str, repo: str, run_id: int) -> bool: + """Re-run only failed jobs in a workflow run.""" + return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun-failed-jobs", {}) is not None + + def cancel_run(self, owner: str, repo: str, run_id: int) -> bool: + """Cancel a workflow run.""" + return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/cancel", {}) is not None + + def get_artifacts(self, owner: str, repo: str, run_id: int | None = None) -> list: + """Get artifacts for a repository or specific run.""" + if run_id: + endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}/artifacts" + else: + endpoint = f"/repos/{owner}/{repo}/actions/artifacts" + data = self._request(endpoint) + if data and "artifacts" in data: + return data["artifacts"] + return data if isinstance(data, list) else [] + + def download_artifact(self, owner: str, repo: str, artifact_id: int, output_path: str) -> bool: + """Download an artifact to a file.""" + endpoint = f"/repos/{owner}/{repo}/actions/artifacts/{artifact_id}/zip" + url = f"{self.base_url}{endpoint}" + headers = {} + if self.config.token: + headers["Authorization"] = f"token {self.config.token}" + + req = urllib.request.Request(url, headers=headers) + try: + with urllib.request.urlopen(req, timeout=120) as resp: + with open(output_path, "wb") as f: + f.write(resp.read()) + return True + except (urllib.error.URLError, IOError) as e: + print(f"{Color.RED}Download failed: {e}{Color.RST}") + return False + + def get_workflows(self, owner: str, repo: str) -> list: + """Get list of workflows in a repository.""" + data = self._request(f"/repos/{owner}/{repo}/actions/workflows") + if data and "workflows" in data: + return data["workflows"] + return data if isinstance(data, list) else [] + + def get_pr_checks(self, owner: str, repo: str, pr_number: int) -> dict | None: + """Get CI check status for a pull request.""" + # Get PR details first to find head SHA + pr = self._request(f"/repos/{owner}/{repo}/pulls/{pr_number}") + if not pr: + return None + + head_sha = pr.get("head", {}).get("sha", "") + if not head_sha: + return None + + # Get commit status + status = self._request(f"/repos/{owner}/{repo}/commits/{head_sha}/status") + return { + "pr": pr, + "sha": head_sha, + "status": status, + } + + def get_version(self) -> dict | None: + """Get Gitea server version info.""" + return self._request("/version") + + def get_registration_token(self, owner: str = "", repo: str = "") -> str | None: + """Get runner registration token for repo, org, or user level. + + Args: + owner: Repository owner or organization (empty for user-level) + repo: Repository name (empty for org/user-level) + + Returns: + Registration token string or None on failure. + """ + if owner and repo: + # Repository-level registration + endpoint = f"/repos/{owner}/{repo}/actions/runners/registration-token" + elif owner: + # Organization-level registration + endpoint = f"/orgs/{owner}/actions/runners/registration-token" + else: + # User-level registration + endpoint = "/user/actions/runners/registration-token" + + result = self._post(endpoint, {}) + if result and "token" in result: + return result["token"] + return None + + def get_runners(self, owner: str = "", repo: str = "") -> list: + """Get runners for repo, org, or global (admin). + + Args: + owner: Repository owner or organization + repo: Repository name (if empty, gets org/global runners) + """ + if owner and repo: + # Repository runners + data = self._request(f"/repos/{owner}/{repo}/actions/runners") + elif owner: + # Organization runners + data = self._request(f"/orgs/{owner}/actions/runners") + else: + # Global runners (admin only) + data = self._request("/admin/runners") + if data and "runners" in data: + return data["runners"] + return data if isinstance(data, list) else [] + + def get_runner_jobs(self, owner: str = "", repo: str = "") -> list: + """Get running/pending jobs for runners.""" + if owner and repo: + endpoint = f"/repos/{owner}/{repo}/actions/runners/jobs" + elif owner: + endpoint = f"/orgs/{owner}/actions/runners/jobs" + else: + endpoint = "/admin/runners/jobs" + data = self._request(endpoint) + if data and "jobs" in data: + return data["jobs"] + return data if isinstance(data, list) else [] + + def get_runner(self, runner_id: int, owner: str = "", repo: str = "") -> dict | None: + """Get a specific runner by ID.""" + if owner and repo: + endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}" + elif owner: + endpoint = f"/orgs/{owner}/actions/runners/{runner_id}" + else: + endpoint = f"/admin/runners/{runner_id}" + return self._request(endpoint) + + def delete_runner(self, runner_id: int, owner: str = "", repo: str = "") -> bool: + """Delete a runner by ID.""" + if owner and repo: + endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}" + elif owner: + endpoint = f"/orgs/{owner}/actions/runners/{runner_id}" + else: + endpoint = f"/admin/runners/{runner_id}" + return self._delete(endpoint) + + def get_workflow_contents(self, owner: str, repo: str, workflow: str) -> str | None: + """Get workflow file contents.""" + # Try .gitea/workflows first, then .github/workflows + for prefix in [".gitea/workflows", ".github/workflows"]: + path = f"{prefix}/{workflow}" + data = self._request(f"/repos/{owner}/{repo}/contents/{path}") + if data and "content" in data: + import base64 + try: + return base64.b64decode(data["content"]).decode("utf-8") + except Exception: + pass + return None + + def delete_run(self, owner: str, repo: str, run_id: int) -> bool: + """Delete a workflow run (for queued runs that can't be cancelled).""" + endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}" + return self._delete(endpoint) + + def _delete(self, endpoint: str) -> bool: + """Make DELETE request.""" + url = f"{self.base_url}{endpoint}" + headers = {"Accept": "application/json"} + if self.config.token: + headers["Authorization"] = f"token {self.config.token}" + else: + print(f"{Color.RED}Authentication required for this operation{Color.RST}") + return False + + req = urllib.request.Request(url, headers=headers, method="DELETE") + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status in (200, 204) + except urllib.error.HTTPError as e: + if e.code == 404: + print(f"{Color.RED}Not found: {endpoint}{Color.RST}") + elif e.code == 403: + print(f"{Color.RED}Permission denied{Color.RST}") + else: + print(f"{Color.RED}Delete failed: {e.code}{Color.RST}") + return False + except urllib.error.URLError as e: + print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") + return False + + def _post(self, endpoint: str, data: dict, retries: int = 3) -> dict | None: + """Make POST request with JSON body.""" + url = f"{self.base_url}{endpoint}" + headers = { + "Accept": "application/json", + "Content-Type": "application/json", + } + + if self.config.token: + headers["Authorization"] = f"token {self.config.token}" + else: + print(f"{Color.RED}Authentication required for this operation{Color.RST}") + return None + + body = json.dumps(data).encode("utf-8") + req = urllib.request.Request(url, data=body, headers=headers, method="POST") + + for attempt in range(retries): + try: + with urllib.request.urlopen(req, timeout=30) as resp: + self._retry_delay = 1 + # Handle 204 No Content + if resp.status == 204: + return {} + content = resp.read().decode() + return json.loads(content) if content else {} + + except urllib.error.HTTPError as e: + if e.code == 401: + print(f"{Color.RED}Authentication failed. Check your token.{Color.RST}") + return None + elif e.code == 403: + print(f"{Color.RED}Permission denied. Token may lack required scopes.{Color.RST}") + return None + elif e.code == 404: + print(f"{Color.RED}Not found: {endpoint}{Color.RST}") + return None + elif e.code == 422: + print(f"{Color.RED}Invalid request (check workflow name/inputs){Color.RST}") + return None + elif e.code == 429: + if attempt < retries - 1: + delay = self._retry_delay + print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}") + time.sleep(delay) + self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) + continue + return None + else: + print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") + return None + + except urllib.error.URLError as e: + if attempt < retries - 1: + time.sleep(self._retry_delay) + continue + print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") + return None + + return None + + + +def format_time(timestamp: str) -> str: + """Format ISO timestamp to relative time.""" + if not timestamp: + return "" + try: + dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) + now = datetime.now(dt.tzinfo) + delta = now - dt + + if delta.days > 0: + return f"{delta.days}d ago" + elif delta.seconds >= 3600: + return f"{delta.seconds // 3600}h ago" + elif delta.seconds >= 60: + return f"{delta.seconds // 60}m ago" + else: + return "just now" + except (ValueError, TypeError): + return timestamp[:16] if len(timestamp) > 16 else timestamp + + +def format_duration(start: str, end: str) -> str: + """Format duration between two timestamps.""" + if not start: + return "" + try: + start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) + if end: + end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) + else: + end_dt = datetime.now(start_dt.tzinfo) + + delta = end_dt - start_dt + total_secs = int(delta.total_seconds()) + + if total_secs >= 3600: + return f"{total_secs // 3600}h{(total_secs % 3600) // 60:02d}m" + elif total_secs >= 60: + return f"{total_secs // 60}m{total_secs % 60:02d}s" + else: + return f"{total_secs}s" + except (ValueError, TypeError): + return "" + + +def status_display(status: str, conclusion: str = "") -> str: + """Get colored status display.""" + key = conclusion if conclusion else status + color, symbol = STATUS_STYLE.get(key.lower(), (Color.DIM, "?")) + return f"{color}{symbol}{Color.RST}" + + +def cmd_list(api: GiteaAPI, args: argparse.Namespace) -> int: + """List recent workflow runs.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + # Full discovery mode - list all repos with recent runs + return cmd_discover(api, args) + + # Get runs via API + runs = api.get_runs(owner, repo, limit=args.limit) + + if not runs: + if not getattr(args, "compact", False): + print(f"{Color.DIM}No workflow runs found{Color.RST}") + return 0 + + # Apply filters + status_filter = getattr(args, "status_filter", "") or "" + branch_filter = getattr(args, "branch", "") or "" + workflow_filter = getattr(args, "workflow", "") or "" + + if status_filter or branch_filter or workflow_filter: + runs = _filter_runs(runs, status_filter, branch_filter, workflow_filter) + if not runs: + if not getattr(args, "compact", False): + filters = [f for f in [status_filter, branch_filter, workflow_filter] if f] + print(f"{Color.DIM}No runs matching filters: {', '.join(filters)}{Color.RST}") + return 0 + + # JSON output + if hasattr(args, "json") and args.json: + return _output_json(runs, owner, repo) + + # Compact output for scripting + if getattr(args, "compact", False): + for run in runs[:args.limit]: + _print_compact_run(run) + return 0 + + print(f"{Color.BLD}Workflow runs for {owner}/{repo}:{Color.RST}\n") + + # Statistics + _print_repo_stats(runs) + + for run in runs[:args.limit]: + _print_single_run(run) + + return 0 + + +def _filter_runs(runs: list, status_filter: str = "", branch: str = "", workflow: str = "") -> list: + """Filter runs by status, branch, and/or workflow.""" + filtered = runs + + # Status filter + if status_filter == "failed": + filtered = [r for r in filtered if r.get("conclusion") == "failure"] + elif status_filter == "success": + filtered = [r for r in filtered if r.get("conclusion") == "success"] + elif status_filter == "running": + filtered = [r for r in filtered if r.get("status") in ("running", "waiting")] + elif status_filter == "completed": + filtered = [r for r in filtered if r.get("status") == "completed"] + + # Branch filter + if branch: + filtered = [r for r in filtered if branch.lower() in r.get("head_branch", "").lower()] + + # Workflow filter + if workflow: + workflow_lower = workflow.lower() + filtered = [r for r in filtered if ( + workflow_lower in r.get("workflow", "").lower() or + workflow_lower in r.get("path", "").lower() or + workflow_lower in r.get("name", "").lower() + )] + + return filtered + + +def _print_compact_run(run: dict, show_repo: bool = False) -> None: + """Print a single run in compact single-line format.""" + run_id = run.get("id", 0) + status = run.get("status", "") + conclusion = run.get("conclusion", "") + + # Status symbol + if conclusion == "success": + sym = f"{Color.GRN}✓{Color.RST}" + elif conclusion == "failure": + sym = f"{Color.RED}✗{Color.RST}" + elif status in ("running", "waiting"): + sym = f"{Color.BLU}●{Color.RST}" + else: + sym = f"{Color.DIM}○{Color.RST}" + + branch = run.get("head_branch", "")[:20] + title = run.get("display_title", "")[:40] + workflow = run.get("workflow", "") + repo = run.get("_repo", "") + duration = run.get("duration", "") or format_duration( + run.get("run_started_at", ""), run.get("updated_at", "") + ) + + # Build compact line + parts = [sym, f"#{run_id:>5}"] + if show_repo and repo: + parts.append(f"{Color.BLD}{repo:<20}{Color.RST}") + if workflow: + parts.append(f"{Color.DIM}{workflow:<15}{Color.RST}") + parts.append(f"{Color.MAG}{branch:<20}{Color.RST}") + parts.append(title) + if duration: + parts.append(f"{Color.DIM}({duration}){Color.RST}") + + print(" ".join(parts)) + + +def _print_repo_stats(runs: list) -> None: + """Print statistics for a single repository.""" + total = len(runs) + if total == 0: + return + + success = len([r for r in runs if r.get("conclusion") == "success"]) + failed = len([r for r in runs if r.get("conclusion") == "failure"]) + running = len([r for r in runs if r.get("status") in ("running", "waiting")]) + completed = total - running + + if completed > 0: + rate = (success / completed) * 100 + print(f"{Color.DIM}Stats: {Color.GRN}✓{success}{Color.RST}{Color.DIM} " + f"{Color.RED}✗{failed}{Color.RST}{Color.DIM} " + f"{Color.BLU}●{running}{Color.RST}{Color.DIM} " + f"({rate:.0f}% success){Color.RST}\n") + + +def _print_single_run(run: dict) -> None: + """Print a single run with full details.""" + run_id = run.get("id", 0) + status = status_display(run.get("status", ""), run.get("conclusion", "")) + title = run.get("display_title", run.get("head_branch", ""))[:50] + branch = run.get("head_branch", "") + workflow = run.get("workflow", "") + when = format_time(run.get("created_at", "")) + duration = run.get("duration", "") or format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) + + if workflow: + print(f" {status} {Color.BLD}{workflow}{Color.RST}") + else: + print(f" {status} {Color.BLD}#{run_id}{Color.RST}") + print(f" {title}") + + info_parts = [] + if branch: + info_parts.append(f"{Color.MAG}{branch}{Color.RST}") + if when: + info_parts.append(when) + if duration: + info_parts.append(duration) + if info_parts: + print(f" {Color.DIM}{' · '.join(info_parts)}{Color.RST}") + print() + + +def _output_json(runs: list, owner: str, repo: str) -> int: + """Output runs as JSON.""" + import json as json_mod + runs_list: list[dict[str, Any]] = [] + output: dict[str, Any] = { + "repository": f"{owner}/{repo}", + "total": len(runs), + "runs": runs_list + } + for run in runs: + runs_list.append({ + "id": run.get("id"), + "status": run.get("status"), + "conclusion": run.get("conclusion"), + "title": run.get("display_title"), + "branch": run.get("head_branch"), + "workflow": run.get("workflow"), + "created_at": run.get("created_at"), + "duration": run.get("duration"), + }) + print(json_mod.dumps(output, indent=2)) + return 0 + + +def cmd_discover(api: GiteaAPI, args: argparse.Namespace) -> int: + """Discover all repositories and their workflow runs.""" + print(f"{Color.BLD}Discovering workflows on {api.config.url}...{Color.RST}\n") + + # Get current user + user = api._request("/user") + if not user: + print(f"{Color.YLW}API token required for discovery.{Color.RST}") + print(f"{Color.DIM}Use: gitea-ci config --token YOUR_TOKEN{Color.RST}") + print(f"{Color.DIM}Or use gitea-scrape for public repos without auth{Color.RST}") + return 1 + + username = user.get("login", "") + print(f"{Color.DIM}User:{Color.RST} {username}\n") + + # Get all repos (user's own + orgs) + repos = api.get_repos(limit=100) + + # Also get repos from organizations + orgs: list[Any] = list(api._request("/user/orgs") or []) + for org in orgs: + org_repos = api._request(f"/orgs/{org.get('username', '')}/repos?limit=100") + if org_repos: + repos.extend(org_repos) + + if not repos: + print(f"{Color.DIM}No repositories found{Color.RST}") + return 0 + + # Collect all recent runs + all_runs = [] + repos_with_actions = 0 + + for i, r in enumerate(repos): + repo_owner = r.get("owner", {}).get("login", "") + repo_name = r.get("name", "") + full_name = r.get("full_name", f"{repo_owner}/{repo_name}") + + # Progress indicator for large repos + if len(repos) > 10 and (i + 1) % 5 == 0: + print(f"{Color.DIM} Scanning {i + 1}/{len(repos)} repositories...{Color.RST}", end="\r", file=sys.stderr) + + runs = api.get_runs(repo_owner, repo_name, limit=3) + if runs: + repos_with_actions += 1 + for run in runs: + run["_repo"] = full_name + all_runs.append(run) + + # Throttle to avoid rate limiting + api.throttle() + + if not all_runs: + print(f"{Color.DIM}No workflow runs found across {len(repos)} repositories{Color.RST}") + return 0 + + # Sort by created_at descending + all_runs.sort(key=lambda x: x.get("created_at", ""), reverse=True) + + _print_run_summary(all_runs, repos_with_actions) + return 0 + + +def _print_run_summary(all_runs: list, repos_with_actions: int) -> None: + """Print summary of workflow runs grouped by status.""" + # Calculate statistics + total = len(all_runs) + success = len([r for r in all_runs if r.get("conclusion") == "success"]) + failed = [r for r in all_runs if r.get("conclusion") == "failure"] + running = [r for r in all_runs if r.get("status") == "running" or r.get("status") == "waiting"] + cancelled = len([r for r in all_runs if r.get("conclusion") == "cancelled"]) + + # Print statistics header + print(f"{Color.BLD}Statistics:{Color.RST}") + completed = total - len(running) + if completed > 0: + success_rate = (success / completed) * 100 + print(f" {Color.GRN}✓ {success}{Color.RST} success " + f"{Color.RED}✗ {len(failed)}{Color.RST} failed " + f"{Color.BLU}● {len(running)}{Color.RST} running " + f"{Color.DIM}⊘ {cancelled} cancelled{Color.RST}") + print(f" {Color.DIM}Success rate: {success_rate:.0f}% ({success}/{completed} completed){Color.RST}") + else: + print(f" {Color.BLU}● {len(running)}{Color.RST} running " + f"{Color.DIM}No completed runs yet{Color.RST}") + print(f" {Color.DIM}Repositories with actions: {repos_with_actions}{Color.RST}") + print() + + # Show running first + if running: + print(f"{Color.BLU}{Color.BLD}Running/Waiting ({len(running)}):{Color.RST}\n") + for run in running[:5]: + _print_run_line(run) + if len(running) > 5: + print(f" {Color.DIM}... and {len(running) - 5} more{Color.RST}\n") + + # Show recent failures + if failed: + print(f"{Diff.DEL}{Color.BLD} Recent Failures ({len(failed)}) {Color.RST}\n") + for run in failed[:5]: + _print_run_line(run) + if len(failed) > 5: + print(f" {Color.DIM}... and {len(failed) - 5} more{Color.RST}\n") + + # Show recent activity + recent = all_runs[:10] + print(f"{Color.BLD}Recent Activity:{Color.RST}\n") + for run in recent: + _print_run_line(run) + + +def _print_run_line(run: dict) -> None: + """Print a single run line with repo info.""" + run_id = run.get("id", 0) + repo = run.get("_repo", "") + status = status_display(run.get("status", ""), run.get("conclusion", "")) + title = run.get("display_title", run.get("head_branch", ""))[:40] + branch = run.get("head_branch", "") + workflow = run.get("workflow", "") + + # Get time - try different fields + when = format_time(run.get("created_at", "")) + + # Get duration - try pre-computed or calculate + duration = run.get("duration", "") + if not duration: + duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) + + # Build info line + info_parts = [] + if branch: + info_parts.append(f"{Color.MAG}{branch}{Color.RST}") + if when: + info_parts.append(when) + if duration: + info_parts.append(duration) + info_str = " · ".join(info_parts) if info_parts else "" + + # Print with workflow name if available + if workflow: + print(f" {status} {Color.BLD}{repo}{Color.RST} {Color.DIM}{workflow}{Color.RST}") + else: + print(f" {status} {Color.BLD}{repo}{Color.RST} #{run_id}") + print(f" {title}") + if info_str: + print(f" {Color.DIM}{info_str}{Color.RST}") + print() + + +def cmd_status(api: GiteaAPI, args: argparse.Namespace) -> int: + """Show detailed status of a workflow run.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required. Use -o OWNER -r REPO or set defaults{Color.RST}") + return 1 + + # Get run (latest or specific) + if args.run_id: + run = api.get_run(owner, repo, args.run_id) + else: + runs = api.get_runs(owner, repo, limit=1) + run = runs[0] if runs else None + + if not run: + print(f"{Color.RED}Run not found{Color.RST}") + return 1 + + run_id = run.get("id", 0) + + # Get jobs + jobs = api.get_jobs(owner, repo, run_id) + + # JSON output + if getattr(args, "json", False): + output = { + "repository": f"{owner}/{repo}", + "run": { + "id": run_id, + "status": run.get("status"), + "conclusion": run.get("conclusion"), + "title": run.get("display_title"), + "branch": run.get("head_branch"), + "commit": run.get("head_sha"), + "created_at": run.get("created_at"), + "started_at": run.get("run_started_at"), + "updated_at": run.get("updated_at"), + }, + "jobs": [ + { + "id": j.get("id"), + "name": j.get("name"), + "status": j.get("status"), + "conclusion": j.get("conclusion"), + "started_at": j.get("started_at"), + "completed_at": j.get("completed_at"), + } + for j in jobs + ] if jobs else [] + } + print(json.dumps(output, indent=2)) + return 0 + + status = status_display(run.get("status", ""), run.get("conclusion", "")) + title = run.get("display_title", "") + branch = run.get("head_branch", "") + commit = run.get("head_sha", "")[:8] + when = format_time(run.get("created_at", "")) + duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) + + print(f"{Color.BLD}Run #{run_id}{Color.RST} {status}") + print(f" {Color.DIM}Title:{Color.RST} {title}") + print(f" {Color.DIM}Branch:{Color.RST} {branch} ({commit})") + print(f" {Color.DIM}Started:{Color.RST} {when}") + print(f" {Color.DIM}Duration:{Color.RST} {duration}") + print() + + if jobs: + print(f"{Color.BLD}Jobs:{Color.RST}\n") + pending_jobs = [] + for job in jobs: + job_id = job.get("id", 0) + job_status_raw = job.get("status", "") + job_status = status_display(job_status_raw, job.get("conclusion", "")) + job_name = job.get("name", "unknown") + job_duration = format_duration(job.get("started_at", ""), job.get("completed_at", "")) + runs_on = job.get("labels", []) # May be empty, will try workflow + + print(f" {job_status} {job_name} {Color.DIM}(ID: {job_id}, {job_duration}){Color.RST}") + + # Collect pending jobs for diagnostics + if job_status_raw in ("pending", "waiting", "queued"): + pending_jobs.append({ + "name": job_name, + "id": job_id, + "labels": runs_on, + "created": job.get("created_at", run.get("created_at", "")), + }) + + # Show pending diagnostics if there are pending jobs + if pending_jobs and not getattr(args, "json", False): + print(f"\n{Color.BLD}Pending Diagnostics:{Color.RST}\n") + + # Get available runners + runners = api.get_runners("", "") # User-level runners first + if not runners: + runners = api.get_runners(owner, repo) + + if not runners: + print(f" {Color.RED}✗{Color.RST} No runners available") + print(f" {Color.DIM}Register a runner: gitea-ci register-token{Color.RST}") + else: + online_runners = [r for r in runners if r.get("status") == "online"] + busy_runners = [r for r in runners if r.get("busy", False)] + runner_labels = set() + online_labels = set() + for r in runners: + labels = r.get("labels", []) + runner_labels.update(labels) + if r.get("status") == "online": + online_labels.update(labels) + + print(f" Runners: {len(online_runners)} online, {len(busy_runners)} busy, {len(runners)} total") + + # For each pending job, check why it might be stuck + for pj in pending_jobs: + created_str = pj.get("created", "") + if created_str: + try: + created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00")) + pending_secs = (datetime.now(created_dt.tzinfo) - created_dt).total_seconds() + pending_time = f"{int(pending_secs // 60)}m {int(pending_secs % 60)}s" if pending_secs >= 60 else f"{int(pending_secs)}s" + except Exception: + pending_time = "?" + else: + pending_time = "?" + + job_labels = pj.get("labels", []) + if not job_labels: + # Try to get from workflow - use default label + job_labels = ["linux"] # Common default + + print(f"\n {Color.YLW}○{Color.RST} {pj['name']} {Color.DIM}(pending {pending_time}){Color.RST}") + + if job_labels: + missing = set(job_labels) - runner_labels + offline_labels = (set(job_labels) & runner_labels) - online_labels + + if missing: + print(f" {Color.RED}Missing labels:{Color.RST} {', '.join(missing)}") + elif offline_labels: + print(f" {Color.YLW}Runners offline for:{Color.RST} {', '.join(offline_labels)}") + elif busy_runners and len(busy_runners) == len(online_runners): + print(f" {Color.YLW}All runners busy{Color.RST}") + else: + print(f" {Color.DIM}Waiting for runner...{Color.RST}") + + return 0 + + +def cmd_logs(api: GiteaAPI, args: argparse.Namespace) -> int: + """Show logs for a job.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + job_id = args.job_id + + # Get run_id from argument or latest + if args.run_id: + run_id = args.run_id + else: + runs = api.get_runs(owner, repo, limit=1) + if not runs: + print(f"{Color.RED}No runs found{Color.RST}") + return 1 + run_id = runs[0].get("id") + + # If no job_id, get jobs for the run + if not job_id: + jobs = api.get_jobs(owner, repo, run_id) + + if not jobs: + print(f"{Color.RED}No jobs found{Color.RST}") + return 1 + + # Prefer failed job + for job in jobs: + if job.get("conclusion") == "failure": + job_id = job.get("id") + break + + if not job_id: + job_id = jobs[0].get("id") + + logs = api.get_job_logs(owner, repo, job_id) + + if logs: + print(logs) + else: + print(f"{Color.RED}No logs available (may require authentication){Color.RST}") + return 1 + + return 0 + + +def cmd_watch(api: GiteaAPI, args: argparse.Namespace) -> int: + """Watch a running workflow.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + run_id = args.run_id + interval = args.interval + notify = getattr(args, "notify", False) + timeout = getattr(args, "timeout", 0) + start_time = time.time() + + timeout_msg = f", timeout: {timeout}s" if timeout else "" + print(f"{Color.BLD}Watching {owner}/{repo}...{Color.RST} (Ctrl+C to stop{timeout_msg})\n") + + try: + while True: + # Check timeout + if timeout and (time.time() - start_time) >= timeout: + print(f"\n{Color.YLW}Timeout reached ({timeout}s){Color.RST}") + return 2 + # Clear screen for refresh + if not args.no_clear: + print("\033[2J\033[H", end="") + + # Get run status + if run_id: + run = api.get_run(owner, repo, run_id) + runs = [run] if run else [] + else: + runs = api.get_runs(owner, repo, limit=5) + + if not runs: + print(f"{Color.DIM}No active runs{Color.RST}") + else: + for run in runs: + rid = run.get("id", 0) + status = status_display(run.get("status", ""), run.get("conclusion", "")) + title = run.get("display_title", "")[:40] + duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) + + print(f"{status} {Color.BLD}#{rid}{Color.RST} {title} {Color.DIM}({duration}){Color.RST}") + + # Show jobs for this run + jobs = api.get_jobs(owner, repo, rid) + for job in jobs: + job_status = status_display(job.get("status", ""), job.get("conclusion", "")) + job_name = job.get("name", "") + print(f" {job_status} {job_name}") + + print() + + # Stop watching if run is complete + if run.get("status") == "completed": + conclusion = run.get("conclusion", "") + if conclusion == "success": + print(f"{Diff.ADD} ✓ Run completed successfully {Color.RST}") + if notify: + send_notification( + "CI Passed", + f"{owner}/{repo} #{rid} succeeded", + "normal" + ) + elif conclusion == "failure": + print(f"{Diff.DEL} ✗ Run failed {Color.RST}") + if notify: + send_notification( + "CI Failed", + f"{owner}/{repo} #{rid} failed", + "critical" + ) + else: + print(f"{Diff.CHG} ⚠ Run {conclusion} {Color.RST}") + if notify: + send_notification( + f"CI {conclusion.title()}", + f"{owner}/{repo} #{rid} {conclusion}", + "normal" + ) + + if run_id: # Only exit if watching specific run + return 0 if conclusion == "success" else 1 + + time.sleep(interval) + + except KeyboardInterrupt: + print(f"\n{Color.DIM}Stopped watching{Color.RST}") + return 0 + + +def _check_token_permissions(api: GiteaAPI) -> int: + """Check what permissions the current token has.""" + config = api.config + + if not config.token: + print(f"{Color.RED}No token configured{Color.RST}") + print(f"{Color.DIM}Set with: gitea-ci config token set {Color.RST}") + return 1 + + print(f"{Color.BLD}Token Permissions{Color.RST}") + print("─" * 35) + + # Basic auth - GET /user + user = api._request("/user") + if user: + username = user.get("login", "unknown") + print(f" {Color.GRN}✓{Color.RST} Authenticated as: {username}") + else: + print(f" {Color.RED}✗{Color.RST} Authentication failed") + return 1 + + # Read repos - GET /user/repos + repos = api._request("/user/repos?limit=1") + if repos is not None: + print(f" {Color.GRN}✓{Color.RST} Read repositories") + else: + print(f" {Color.RED}✗{Color.RST} Read repositories") + + # Organization access - GET /user/orgs + orgs = api._request("/user/orgs") + if orgs is not None: + org_count = len(orgs) if isinstance(orgs, list) else 0 + print(f" {Color.GRN}✓{Color.RST} Organization access ({org_count} orgs)") + else: + print(f" {Color.RED}✗{Color.RST} Organization access") + + # Admin access - GET /admin/runners + runners = api._request("/admin/runners") + if runners is not None: + print(f" {Color.GRN}✓{Color.RST} Admin access") + else: + print(f" {Color.DIM}○{Color.RST} Admin access (not admin or scope missing)") + + # User-level runners - GET /user/actions/runners/registration-token + # This tests if we can manage user-level runners + user_runners = api._request("/user/actions/runners") + if user_runners is not None: + print(f" {Color.GRN}✓{Color.RST} User-level runners") + else: + print(f" {Color.DIM}○{Color.RST} User-level runners") + + # Check if we can trigger workflows (requires repo scope) + print() + print(f"{Color.DIM}Token ends with: ...{config.token[-4:]}{Color.RST}") + print(f"{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}") + + return 0 + + +def cmd_config(api: GiteaAPI, args: argparse.Namespace) -> int: + """Configure gitea-ci.""" + config = api.config + + # Handle token subcommand + action = getattr(args, "action", None) + if action == "token": + token_action = getattr(args, "token_action", None) + if token_action == "check": + return _check_token_permissions(api) + elif token_action == "set": + new_token = getattr(args, "token_value", None) + if not new_token: + print(f"{Color.RED}Token value required{Color.RST}") + return 1 + config.token = new_token + config.save() + print(f"{Color.GRN}Token saved to {CONFIG_FILE}{Color.RST}") + return 0 + else: + # Just show token info + if config.token: + print(f"{Color.BLD}Token:{Color.RST} {'*' * 8}...{config.token[-4:]}") + print(f"{Color.DIM}Use 'gitea-ci config token check' to test permissions{Color.RST}") + else: + print(f"{Color.YLW}No token configured{Color.RST}") + print(f"{Color.DIM}Set with: gitea-ci config token set {Color.RST}") + return 0 + + if args.show: + print(f"{Color.BLD}Current configuration:{Color.RST}") + print(f" URL: {config.url}") + print(f" Token: {'***' if config.token else '(not set)'}") + print(f" Default owner: {config.default_owner or '(not set)'}") + print(f" Default repo: {config.default_repo or '(not set)'}") + print(f"\n{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}") + return 0 + + if args.test: + print(f"{Color.BLD}Testing API connectivity...{Color.RST}\n") + print(f" URL: {config.url}") + print(f" Token: {'configured' if config.token else 'not set'}") + print() + + # Test basic connectivity + try: + req = urllib.request.Request( + f"{config.url}/api/v1/version", + headers={"Accept": "application/json"} + ) + with urllib.request.urlopen(req, timeout=10) as resp: + data = json.loads(resp.read().decode()) + version = data.get("version", "unknown") + print(f" {Color.GRN}✓{Color.RST} Server reachable (Gitea {version})") + except urllib.error.URLError as e: + print(f" {Color.RED}✗{Color.RST} Connection failed: {e.reason}") + return 1 + + # Test authentication if token set + if config.token: + user = api._request("/user") + if user: + username = user.get("login", "unknown") + print(f" {Color.GRN}✓{Color.RST} Authenticated as: {username}") + else: + print(f" {Color.RED}✗{Color.RST} Authentication failed (invalid token?)") + return 1 + else: + print(f" {Color.YLW}○{Color.RST} No token configured (read-only mode)") + + # Test default repo if configured + if config.default_owner and config.default_repo: + runs = api.get_runs(config.default_owner, config.default_repo, limit=1) + if runs is not None: + print(f" {Color.GRN}✓{Color.RST} Default repo accessible: {config.default_owner}/{config.default_repo}") + else: + print(f" {Color.YLW}○{Color.RST} Default repo not accessible (may not have actions)") + + print(f"\n{Color.GRN}API connectivity OK{Color.RST}") + return 0 + + # Interactive config + print(f"{Color.BLD}Gitea CI Configuration{Color.RST}\n") + + url = input(f"Gitea URL [{config.url}]: ").strip() + if url: + config.url = url + + token = input(f"API Token [{'***' if config.token else 'none'}]: ").strip() + if token: + config.token = token + + owner = input(f"Default owner [{config.default_owner}]: ").strip() + if owner: + config.default_owner = owner + + repo = input(f"Default repo [{config.default_repo}]: ").strip() + if repo: + config.default_repo = repo + + config.save() + print(f"\n{Color.GRN}Configuration saved to {CONFIG_FILE}{Color.RST}") + + return 0 + + +def cmd_repo(api: GiteaAPI, args: argparse.Namespace) -> int: + """Manage repositories.""" + action = getattr(args, "action", "list") + + if action == "list": + # List user's repositories + print(f"{Color.BLD}Repositories:{Color.RST}\n") + repos = api.get_repos(limit=100) + if not repos: + print(f"{Color.DIM}No repositories found{Color.RST}") + return 0 + + for r in repos: + name = r.get("full_name", "") + desc = r.get("description", "")[:40] if r.get("description") else "" + private = r.get("private", False) + visibility = f"{Color.YLW}private{Color.RST}" if private else f"{Color.GRN}public{Color.RST}" + has_actions = r.get("has_actions", False) + actions_icon = f" {Color.DIM}[actions]{Color.RST}" if has_actions else "" + + print(f" {name} [{visibility}]{actions_icon}") + if desc: + print(f" {Color.DIM}{desc}{Color.RST}") + return 0 + + elif action == "create": + name = args.name + if not name: + print(f"{Color.RED}Repository name required{Color.RST}") + return 1 + + description = getattr(args, "description", "") or "" + private = getattr(args, "private", False) + + # Check if already exists + user = api._request("/user") + if not user: + print(f"{Color.RED}Authentication required{Color.RST}") + return 1 + username = user.get("login", "") + + if api.repo_exists(username, name): + print(f"{Color.YLW}Repository {username}/{name} already exists{Color.RST}") + return 0 + + print(f"Creating repository {Color.BLD}{name}{Color.RST}...") + result = api.create_repo(name, description, private) + if result: + full_name = result.get("full_name", name) + clone_url = result.get("ssh_url", result.get("clone_url", "")) + print(f"{Color.GRN}✓{Color.RST} Created: {full_name}") + if clone_url: + print(f" {Color.DIM}Clone: {clone_url}{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗{Color.RST} Failed to create repository") + return 1 + + elif action == "exists": + owner = args.owner or api.config.default_owner + repo = args.repo or args.name + + if not owner or not repo: + print(f"{Color.RED}Repository (owner/repo) required{Color.RST}") + return 1 + + if api.repo_exists(owner, repo): + print(f"{Color.GRN}✓{Color.RST} {owner}/{repo} exists") + return 0 + else: + print(f"{Color.RED}✗{Color.RST} {owner}/{repo} does not exist") + return 1 + + elif action == "check": + owner = args.owner or api.config.default_owner + repo = args.repo or args.name or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository (owner/repo) required{Color.RST}") + return 1 + + print(f"{Color.BLD}Repository Check: {owner}/{repo}{Color.RST}\n") + + # Check if repository exists and has actions enabled + repo_info = api._request(f"/repos/{owner}/{repo}") + if not repo_info: + print(f"{Color.RED}✗{Color.RST} Repository not found or not accessible") + return 1 + + has_actions = repo_info.get("has_actions", False) + if has_actions: + print(f"{Color.GRN}✓{Color.RST} Actions enabled") + else: + print(f"{Color.RED}✗{Color.RST} Actions not enabled") + print(f" {Color.DIM}Enable in repository Settings → Actions{Color.RST}") + return 1 + + # Get workflows + workflows = api.get_workflows(owner, repo) + if workflows: + print(f"{Color.GRN}✓{Color.RST} Found {len(workflows)} workflow(s)") + else: + print(f"{Color.YLW}○{Color.RST} No workflows found") + print(f" {Color.DIM}Create .gitea/workflows/*.yml or .github/workflows/*.yml{Color.RST}") + return 0 + + # Get available runners (user scope first) + runners = api.get_runners("", "") # User-level runners + if not runners: + # Try repo scope + runners = api.get_runners(owner, repo) + + runner_labels = set() + online_labels = set() + for r in runners: + labels = r.get("labels", []) + runner_labels.update(labels) + if r.get("status") == "online": + online_labels.update(labels) + + if runners: + online_count = sum(1 for r in runners if r.get("status") == "online") + print(f"{Color.GRN}✓{Color.RST} Runners available: {online_count} online, {len(runners)} total") + else: + print(f"{Color.RED}✗{Color.RST} No runners found") + print(f" {Color.DIM}Register a runner with: gitea-ci register-token{Color.RST}") + return 1 + + # Analyze workflows for runs-on labels + print(f"\n{Color.BLD}Workflow Analysis:{Color.RST}\n") + issues = [] + + for wf in workflows: + wf_name = wf.get("name", "unnamed") + wf_path = wf.get("path", "") + wf_file = wf_path.split("/")[-1] if wf_path else "unknown" + + # Get workflow contents and parse runs-on + content = api.get_workflow_contents(owner, repo, wf_file) + if not content: + print(f" {Color.DIM}○{Color.RST} {wf_name} {Color.DIM}(cannot read){Color.RST}") + continue + + # Parse runs-on labels from YAML content + runs_on_matches = re.findall(r'runs-on:\s*([^\n]+)', content) + required_labels = set() + for match in runs_on_matches: + # Handle array syntax: [label1, label2] or simple string + match = match.strip() + if match.startswith('['): + # Array syntax + labels = re.findall(r'[\w-]+', match) + required_labels.update(labels) + else: + # Simple string (may have quotes) + label = match.strip('\'"') + required_labels.add(label) + + if not required_labels: + print(f" {Color.DIM}○{Color.RST} {wf_name} {Color.DIM}(no runs-on found){Color.RST}") + continue + + # Check if labels match available runners + missing = required_labels - runner_labels + offline = (required_labels & runner_labels) - online_labels + + if missing: + print(f" {Color.RED}✗{Color.RST} {wf_name}") + print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}") + print(f" {Color.RED}missing:{Color.RST} {', '.join(missing)}") + issues.append((wf_name, "missing labels", missing)) + elif offline: + print(f" {Color.YLW}○{Color.RST} {wf_name}") + print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}") + print(f" {Color.YLW}offline:{Color.RST} {', '.join(offline)}") + issues.append((wf_name, "runners offline", offline)) + else: + print(f" {Color.GRN}✓{Color.RST} {wf_name} {Color.DIM}({', '.join(required_labels)}){Color.RST}") + + # Summary + if issues: + print(f"\n{Color.BLD}Issues Found:{Color.RST}\n") + all_missing = set() + for wf_name, issue_type, labels in issues: + if issue_type == "missing labels": + all_missing.update(labels) + + if all_missing: + print(f" Missing runner labels: {', '.join(sorted(all_missing))}") + print(f" {Color.DIM}Register runners with these labels or update workflows{Color.RST}") + return 1 + + print(f"\n{Color.GRN}All workflows have matching runners{Color.RST}") + return 0 + + return 0 + + +def cmd_trigger(api: GiteaAPI, args: argparse.Namespace) -> int: + """Trigger a workflow manually.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + workflow = args.workflow + ref = args.ref or "master" + + # Parse inputs if provided + inputs = {} + if hasattr(args, "input") and args.input: + for inp in args.input: + if "=" in inp: + key, value = inp.split("=", 1) + inputs[key] = value + + # If no workflow specified, list available ones + if not workflow: + workflows = api.get_workflows(owner, repo) + if not workflows: + print(f"{Color.DIM}No workflows found{Color.RST}") + return 1 + + print(f"{Color.BLD}Available workflows:{Color.RST}\n") + for wf in workflows: + name = wf.get("name", "") + path = wf.get("path", "") + state = wf.get("state", "") + print(f" {Color.BLD}{path}{Color.RST}") + if name: + print(f" Name: {name}") + if state: + print(f" State: {state}") + print() + print(f"{Color.DIM}Use: gitea-ci trigger -w {Color.RST}") + return 0 + + print(f"{Color.BLD}Triggering workflow...{Color.RST}") + print(f" Workflow: {workflow}") + print(f" Branch: {ref}") + if inputs: + print(f" Inputs: {inputs}") + + if api.trigger_workflow(owner, repo, workflow, ref, inputs if inputs else None): + print(f"\n{Color.GRN}✓ Workflow triggered successfully{Color.RST}") + print(f"{Color.DIM}Use 'gitea-ci watch {owner}/{repo}' to monitor{Color.RST}") + return 0 + else: + print(f"\n{Color.RED}✗ Failed to trigger workflow{Color.RST}") + return 1 + + +def cmd_rerun(api: GiteaAPI, args: argparse.Namespace) -> int: + """Re-run a workflow or job.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + # Re-run specific job + if args.job_id: + print(f"{Color.BLD}Re-running job #{args.job_id}...{Color.RST}") + if api.rerun_job(owner, repo, args.job_id): + print(f"{Color.GRN}✓ Job re-run started{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to re-run job{Color.RST}") + return 1 + + # Get run ID + run_id = args.run_id + if not run_id: + # Get latest run + runs = api.get_runs(owner, repo, limit=1) + if not runs: + print(f"{Color.RED}No runs found{Color.RST}") + return 1 + run_id = runs[0].get("id") + + # Re-run failed jobs only or all jobs + if args.failed_only: + print(f"{Color.BLD}Re-running failed jobs in run #{run_id}...{Color.RST}") + if api.rerun_failed_jobs(owner, repo, run_id): + print(f"{Color.GRN}✓ Failed jobs re-run started{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to re-run jobs{Color.RST}") + return 1 + else: + print(f"{Color.BLD}Re-running all jobs in run #{run_id}...{Color.RST}") + if api.rerun_workflow(owner, repo, run_id): + print(f"{Color.GRN}✓ Workflow re-run started{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to re-run workflow{Color.RST}") + return 1 + + +def cmd_cancel(api: GiteaAPI, args: argparse.Namespace) -> int: + """Cancel a running workflow.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + run_id = args.run_id + if not run_id: + # Get latest running run + runs = api.get_runs(owner, repo, limit=10) + running = [r for r in runs if r.get("status") in ("running", "waiting", "queued")] + if not running: + print(f"{Color.DIM}No running workflows to cancel{Color.RST}") + return 0 + run_id = running[0].get("id") + + print(f"{Color.BLD}Cancelling run #{run_id}...{Color.RST}") + + if api.cancel_run(owner, repo, run_id): + print(f"{Color.GRN}✓ Workflow cancelled{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to cancel workflow{Color.RST}") + return 1 + + +def cmd_artifacts(api: GiteaAPI, args: argparse.Namespace) -> int: + """List or download artifacts.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + # Download specific artifact + if args.download: + artifact_id = args.download + output = args.output or f"artifact-{artifact_id}.zip" + + print(f"{Color.BLD}Downloading artifact #{artifact_id}...{Color.RST}") + if api.download_artifact(owner, repo, artifact_id, output): + size = Path(output).stat().st_size + print(f"{Color.GRN}✓ Downloaded to {output} ({size:,} bytes){Color.RST}") + return 0 + return 1 + + # List artifacts + run_id = args.run_id + artifacts = api.get_artifacts(owner, repo, run_id) + + if not artifacts: + print(f"{Color.DIM}No artifacts found{Color.RST}") + return 0 + + print(f"{Color.BLD}Artifacts:{Color.RST}\n") + for art in artifacts: + art_id = art.get("id", 0) + name = art.get("name", "unknown") + size = art.get("size_in_bytes", 0) + expired = art.get("expired", False) + created = format_time(art.get("created_at", "")) + + status_icon = f"{Color.DIM}(expired){Color.RST}" if expired else "" + print(f" {Color.BLD}#{art_id}{Color.RST} {name} {status_icon}") + print(f" {Color.DIM}Size: {size:,} bytes · Created: {created}{Color.RST}") + print() + + print(f"{Color.DIM}Download: gitea-ci artifacts -d [-O filename.zip]{Color.RST}") + return 0 + + +def cmd_pr(api: GiteaAPI, args: argparse.Namespace) -> int: + """Show CI status for a pull request.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + pr_number = args.pr_number + if not pr_number: + print(f"{Color.RED}PR number required (-p/--pr){Color.RST}") + return 1 + + data = api.get_pr_checks(owner, repo, pr_number) + if not data: + print(f"{Color.RED}PR #{pr_number} not found{Color.RST}") + return 1 + + pr = data["pr"] + sha = data["sha"][:8] + status = data.get("status", {}) + + # PR info + title = pr.get("title", "") + state = pr.get("state", "") + mergeable = pr.get("mergeable", False) + user = pr.get("user", {}).get("login", "") + + print(f"{Color.BLD}PR #{pr_number}: {title}{Color.RST}") + print(f" {Color.DIM}Author:{Color.RST} {user}") + print(f" {Color.DIM}State:{Color.RST} {state}") + print(f" {Color.DIM}HEAD:{Color.RST} {sha}") + print(f" {Color.DIM}Mergeable:{Color.RST} {'Yes' if mergeable else 'No'}") + print() + + # CI status + if status: + overall = status.get("state", "pending") + statuses = status.get("statuses", []) + + # Map state to display + state_display = { + "success": f"{Color.GRN}✓ All checks passed{Color.RST}", + "pending": f"{Color.YLW}● Checks in progress{Color.RST}", + "failure": f"{Color.RED}✗ Some checks failed{Color.RST}", + "error": f"{Color.RED}✗ Check errors{Color.RST}", + } + print(f"{Color.BLD}CI Status:{Color.RST} {state_display.get(overall, overall)}\n") + + if statuses: + for s in statuses: + ctx = s.get("context", "") + state = s.get("state", "") + desc = s.get("description", "")[:50] + icon = status_display(state, state) + print(f" {icon} {ctx}") + if desc: + print(f" {Color.DIM}{desc}{Color.RST}") + else: + # Try to get workflow runs for this commit + runs = api.get_runs(owner, repo, limit=5) + commit_runs = [r for r in runs if r.get("head_sha", "").startswith(sha)] + if commit_runs: + print(f" {Color.DIM}Workflow runs for this commit:{Color.RST}") + for run in commit_runs: + run_status = status_display(run.get("status", ""), run.get("conclusion", "")) + run_title = run.get("display_title", "")[:40] + print(f" {run_status} {run_title}") + else: + print(f"{Color.DIM}No CI status found{Color.RST}") + + return 0 + + +def cmd_compare(api: GiteaAPI, args: argparse.Namespace) -> int: + """Compare two workflow runs.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + run1_id = args.run1 + run2_id = args.run2 + + # If only one run specified, compare with previous + if not run2_id: + runs = api.get_runs(owner, repo, limit=20) + found_idx = None + for i, r in enumerate(runs): + if r.get("id") == run1_id: + found_idx = i + break + if found_idx is not None and found_idx + 1 < len(runs): + run2_id = runs[found_idx + 1].get("id") + else: + print(f"{Color.RED}Cannot find previous run to compare{Color.RST}") + return 1 + + # Fetch both runs and their jobs + run1 = api.get_run(owner, repo, run1_id) + run2 = api.get_run(owner, repo, run2_id) + + if not run1 or not run2: + print(f"{Color.RED}Could not fetch one or both runs{Color.RST}") + return 1 + + jobs1 = api.get_jobs(owner, repo, run1_id) + jobs2 = api.get_jobs(owner, repo, run2_id) + + # JSON output + if getattr(args, "json", False): + job_changes: list[dict[str, Any]] = [] + output: dict[str, Any] = { + "repository": f"{owner}/{repo}", + "run1": {"id": run1_id, "status": run1.get("status"), "conclusion": run1.get("conclusion")}, + "run2": {"id": run2_id, "status": run2.get("status"), "conclusion": run2.get("conclusion")}, + "job_changes": job_changes, + } + jobs1_map = {j.get("name"): j for j in jobs1} + jobs2_map = {j.get("name"): j for j in jobs2} + all_job_names = set(jobs1_map.keys()) | set(jobs2_map.keys()) + for name in sorted(all_job_names): + j1 = jobs1_map.get(name, {}) + j2 = jobs2_map.get(name, {}) + job_changes.append({ + "name": name, + "run1_conclusion": j1.get("conclusion"), + "run2_conclusion": j2.get("conclusion"), + "changed": j1.get("conclusion") != j2.get("conclusion"), + }) + print(json.dumps(output, indent=2)) + return 0 + + # Pretty output + print(f"{Color.BLD}Comparing runs:{Color.RST}") + print(f" Run #{run1_id}: {status_display(run1.get('status', ''), run1.get('conclusion', ''))} " + f"{run1.get('display_title', '')[:40]}") + print(f" Run #{run2_id}: {status_display(run2.get('status', ''), run2.get('conclusion', ''))} " + f"{run2.get('display_title', '')[:40]}") + print() + + # Compare jobs + jobs1_map = {j.get("name"): j for j in jobs1} + jobs2_map = {j.get("name"): j for j in jobs2} + all_job_names_sorted = sorted(set(jobs1_map.keys()) | set(jobs2_map.keys())) + + changes = [] + unchanged = [] + + for name in all_job_names_sorted: + j1 = jobs1_map.get(name, {}) + j2 = jobs2_map.get(name, {}) + c1 = j1.get("conclusion", "missing") + c2 = j2.get("conclusion", "missing") + + if c1 != c2: + changes.append((name, c1, c2)) + else: + unchanged.append((name, c1)) + + if changes: + print(f"{Color.BLD}Changed jobs:{Color.RST}\n") + for name, c1, c2 in changes: + s1 = status_display("completed", c1) + s2 = status_display("completed", c2) + print(f" {name}") + print(f" {s1} #{run1_id} → {s2} #{run2_id}") + print() + + if unchanged and not changes: + print(f"{Color.DIM}All {len(unchanged)} jobs have same status{Color.RST}") + elif unchanged: + print(f"{Color.DIM}Unchanged: {len(unchanged)} jobs{Color.RST}") + + return 0 + + +def cmd_infra(api: GiteaAPI, args: argparse.Namespace) -> int: + """Show infrastructure status: Gitea version, runners, workflows.""" + owner = getattr(args, "owner", None) or api.config.default_owner + repo = getattr(args, "repo", None) or api.config.default_repo + + print(f"{Color.BLD}Infrastructure Status{Color.RST}\n") + print(f"{Color.DIM}Instance:{Color.RST} {api.config.url}") + + # Gitea version + version = api.get_version() + if version: + print(f"{Color.DIM}Version:{Color.RST} Gitea {version.get('version', 'unknown')}") + print() + + # Try to get runners at different levels + runners_found = False + + # Repository runners + if owner and repo: + print(f"{Color.BLD}Repository: {owner}/{repo}{Color.RST}") + runners = api.get_runners(owner, repo) + if runners: + runners_found = True + print(f"\n{Color.BLD}Repository Runners:{Color.RST}") + for r in runners: + status = "online" if r.get("status") == "online" else "offline" + color = Color.GRN if status == "online" else Color.RED + name = r.get("name", "unnamed") + labels = ", ".join(r.get("labels", [])[:3]) or "no labels" + print(f" {color}●{Color.RST} {name} ({labels})") + + # Workflows + workflows = api.get_workflows(owner, repo) + if workflows: + print(f"\n{Color.BLD}Workflows:{Color.RST}") + for wf in workflows: + name = wf.get("name", "") + path = wf.get("path", "").split("/")[-1] + state = wf.get("state", "") + if state == "active": + print(f" {Color.GRN}✓{Color.RST} {path} - {name}") + else: + print(f" {Color.DIM}○{Color.RST} {path} - {name} ({state})") + + # Recent runs summary + runs = api.get_runs(owner, repo, limit=10) + if runs: + success = sum(1 for r in runs if r.get("conclusion") == "success") + failed = sum(1 for r in runs if r.get("conclusion") == "failure") + running = sum(1 for r in runs if r.get("status") in ("running", "waiting", "queued")) + print(f"\n{Color.BLD}Recent Runs (last 10):{Color.RST}") + print(f" {Color.GRN}✓{Color.RST} {success} success {Color.RED}✗{Color.RST} {failed} failed {Color.BLU}●{Color.RST} {running} active") + + # Organization runners (if owner specified) + if owner and not repo: + print(f"{Color.BLD}Organization: {owner}{Color.RST}") + runners = api.get_runners(owner) + if runners: + runners_found = True + print(f"\n{Color.BLD}Organization Runners:{Color.RST}") + for r in runners: + status = "online" if r.get("status") == "online" else "offline" + color = Color.GRN if status == "online" else Color.RED + name = r.get("name", "unnamed") + print(f" {color}●{Color.RST} {name}") + + # Global runners (admin) + if not owner: + runners = api.get_runners() + if runners: + runners_found = True + print(f"{Color.BLD}Global Runners:{Color.RST}") + for r in runners: + status = r.get("status", "unknown") + color = Color.GRN if status == "online" else Color.RED + name = r.get("name", "unnamed") + labels = ", ".join(r.get("labels", [])[:3]) or "no labels" + busy = r.get("busy", False) + busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else "" + print(f" {color}●{Color.RST} {name} ({labels}){busy_str}") + elif not runners_found: + print(f"{Color.DIM}No runners found (may require admin access){Color.RST}") + + # Pending jobs + if getattr(args, "jobs", False): + jobs = api.get_runner_jobs(owner or "", repo or "") + if jobs: + print(f"\n{Color.BLD}Pending/Running Jobs:{Color.RST}") + for job in jobs[:10]: + status = job.get("status", "") + name = job.get("name", "") + repo_name = job.get("repository", {}).get("full_name", "") + print(f" {Color.BLU}●{Color.RST} {name} ({repo_name})") + + return 0 + + +def cmd_runners(api: GiteaAPI, args: argparse.Namespace) -> int: + """Manage runners - list, delete, info, jobs.""" + owner = getattr(args, "owner", None) or api.config.default_owner + repo = getattr(args, "repo", None) or api.config.default_repo + action = getattr(args, "action", "list") + runner_id = getattr(args, "runner_id", None) + + # Determine scope string + if owner and repo: + scope = f"repository {owner}/{repo}" + elif owner: + scope = f"organization {owner}" + else: + scope = "user-level" + + # Handle different actions + if action == "delete": + if not runner_id: + print(f"{Color.RED}Runner ID required: gitea-ci runners delete {Color.RST}") + return 1 + + # Confirm unless --force + if not getattr(args, "force", False): + confirm = input(f"Delete runner #{runner_id}? [y/N]: ").strip().lower() + if confirm != "y": + print(f"{Color.DIM}Cancelled{Color.RST}") + return 0 + + print(f"{Color.BLD}Deleting runner #{runner_id}...{Color.RST}") + if api.delete_runner(runner_id, owner or "", repo or ""): + print(f"{Color.GRN}✓ Runner deleted{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to delete runner{Color.RST}") + print(f"{Color.DIM}(API may not be available in this Gitea version){Color.RST}") + return 1 + + elif action == "info": + if not runner_id: + print(f"{Color.RED}Runner ID required: gitea-ci runners info {Color.RST}") + return 1 + + runner = api.get_runner(runner_id, owner or "", repo or "") + if not runner: + print(f"{Color.RED}Runner #{runner_id} not found{Color.RST}") + return 1 + + if getattr(args, "json", False): + print(json.dumps(runner, indent=2)) + return 0 + + print(f"{Color.BLD}Runner #{runner_id}{Color.RST}") + print("─" * 35) + print(f" {Color.BLD}Name:{Color.RST} {runner.get('name', 'unnamed')}") + status = runner.get("status", "unknown") + status_color = Color.GRN if status == "online" else Color.RED + print(f" {Color.BLD}Status:{Color.RST} {status_color}{status}{Color.RST}") + print(f" {Color.BLD}Busy:{Color.RST} {'Yes' if runner.get('busy') else 'No'}") + labels = runner.get("labels", []) + if labels: + print(f" {Color.BLD}Labels:{Color.RST} {', '.join(labels)}") + if runner.get("version"): + print(f" {Color.BLD}Version:{Color.RST} {runner.get('version')}") + return 0 + + elif action == "jobs": + jobs = api.get_runner_jobs(owner or "", repo or "") + + if getattr(args, "json", False): + print(json.dumps({"scope": scope, "jobs": jobs}, indent=2)) + return 0 + + print(f"{Color.BLD}Runner Jobs ({scope}):{Color.RST}\n") + + if not jobs: + print(f"{Color.DIM}No running/pending jobs{Color.RST}") + return 0 + + for job in jobs: + job_id = job.get("id", 0) + name = job.get("name", "unnamed") + status = job.get("status", "unknown") + runs_on = job.get("runs_on", []) + + color, symbol = STATUS_STYLE.get(status, (Color.DIM, "?")) + labels_str = f" [{', '.join(runs_on)}]" if runs_on else "" + print(f" {color}{symbol}{Color.RST} #{job_id} {name}{labels_str}") + + return 0 + + else: # list (default) + if owner and repo: + runners = api.get_runners(owner, repo) + elif owner: + runners = api.get_runners(owner) + else: + runners = api.get_runners() + + if getattr(args, "json", False): + print(json.dumps({"scope": scope, "runners": runners}, indent=2)) + return 0 + + print(f"{Color.BLD}Runners ({scope}):{Color.RST}\n") + + if not runners: + print(f"{Color.DIM}No runners found{Color.RST}") + print(f"{Color.DIM}(May require authentication or admin access){Color.RST}") + return 0 + + online = 0 + offline = 0 + + for r in runners: + rid = r.get("id", 0) + name = r.get("name", "unnamed") + status = r.get("status", "unknown") + busy = r.get("busy", False) + labels = r.get("labels", []) + + if status == "online": + online += 1 + color = Color.GRN + symbol = "●" + else: + offline += 1 + color = Color.RED + symbol = "○" + + busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else "" + labels_str = f" ({', '.join(labels[:4])})" if labels else "" + + print(f" {color}{symbol}{Color.RST} #{rid} {name}{labels_str}{busy_str}") + + print(f"\n{Color.DIM}Total: {online} online, {offline} offline{Color.RST}") + return 0 + + +def cmd_register_token(api: GiteaAPI, args: argparse.Namespace) -> int: + """Get runner registration token.""" + # If --user flag is set, force user-level token + if getattr(args, "user", False): + owner = "" + repo = "" + else: + owner = getattr(args, "owner", None) or api.config.default_owner + repo = getattr(args, "repo", None) or api.config.default_repo + + # Determine scope + if owner and repo: + scope = f"repository {owner}/{repo}" + elif owner: + scope = f"organization {owner}" + else: + scope = "user" + + print(f"{Color.BLD}Getting registration token ({scope})...{Color.RST}") + + token = api.get_registration_token(owner or "", repo or "") + + if token: + print(f"\n{Color.GRN}Registration Token:{Color.RST}\n") + print(f" {token}") + print(f"\n{Color.DIM}Usage:{Color.RST}") + print(f" act_runner register --instance {api.config.url} --token {token}") + print() + return 0 + else: + print(f"{Color.RED}Failed to get registration token{Color.RST}") + print(f"{Color.DIM}Make sure your token has the required scopes{Color.RST}") + return 1 + + +def cmd_delete(api: GiteaAPI, args: argparse.Namespace) -> int: + """Delete a workflow run.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + run_id = args.run_id + if not run_id: + print(f"{Color.RED}Run ID required (-R/--run-id){Color.RST}") + return 1 + + # Confirm unless --force + if not getattr(args, "force", False): + confirm = input(f"Delete run #{run_id}? [y/N]: ").strip().lower() + if confirm != "y": + print(f"{Color.DIM}Cancelled{Color.RST}") + return 0 + + print(f"{Color.BLD}Deleting run #{run_id}...{Color.RST}") + if api.delete_run(owner, repo, run_id): + print(f"{Color.GRN}✓ Run deleted{Color.RST}") + return 0 + else: + print(f"{Color.RED}✗ Failed to delete run{Color.RST}") + return 1 + + +def cmd_workflows(api: GiteaAPI, args: argparse.Namespace) -> int: + """List workflows in a repository with details.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.RED}Repository required{Color.RST}") + return 1 + + workflows = api.get_workflows(owner, repo) + + if getattr(args, "json", False): + print(json.dumps({"repository": f"{owner}/{repo}", "workflows": workflows}, indent=2)) + return 0 + + print(f"{Color.BLD}Workflows for {owner}/{repo}:{Color.RST}\n") + + if not workflows: + print(f"{Color.DIM}No workflows found{Color.RST}") + return 0 + + for wf in workflows: + name = wf.get("name", "unnamed") + path = wf.get("path", "") + state = wf.get("state", "unknown") + filename = path.split("/")[-1] if path else "" + + if state == "active": + color = Color.GRN + symbol = "✓" + elif state == "disabled": + color = Color.YLW + symbol = "○" + else: + color = Color.DIM + symbol = "?" + + print(f" {color}{symbol}{Color.RST} {filename}") + print(f" {Color.DIM}Name:{Color.RST} {name}") + print(f" {Color.DIM}Path:{Color.RST} {path}") + + # Get workflow triggers if verbose + if getattr(args, "verbose", False): + content = api.get_workflow_contents(owner, repo, filename) + if content: + # Parse YAML for triggers + triggers = [] + in_on = False + for line in content.split("\n"): + stripped = line.strip() + if stripped.startswith("on:"): + in_on = True + # Single-line on: push + if ":" in stripped[3:]: + triggers.append(stripped[3:].strip()) + elif in_on: + if stripped and not stripped.startswith("#"): + if line[0] not in (" ", "\t"): + break # End of on: block + if ":" in stripped or stripped.endswith(":"): + triggers.append(stripped.rstrip(":")) + if triggers: + print(f" {Color.DIM}Triggers:{Color.RST} {', '.join(triggers[:5])}") + print() + + return 0 + + +def cmd_validate(api: GiteaAPI, args: argparse.Namespace) -> int: + """Validate local workflow files.""" + path_arg = getattr(args, "path", None) + check_runners = getattr(args, "check_runners", False) + strict = getattr(args, "strict", False) + + # Find workflow files + if path_arg: + path = Path(path_arg) + if path.is_file(): + workflows = [path] + elif path.is_dir(): + workflows = list(path.glob("*.yml")) + list(path.glob("*.yaml")) + else: + print(f"{Color.RED}Path not found: {path_arg}{Color.RST}") + return 1 + else: + # Look for workflow directories + for workflow_dir in [".gitea/workflows", ".github/workflows"]: + p = Path(workflow_dir) + if p.exists(): + workflows = list(p.glob("*.yml")) + list(p.glob("*.yaml")) + break + else: + print(f"{Color.RED}No workflow directory found{Color.RST}") + print(f"{Color.DIM}Expected: .gitea/workflows/ or .github/workflows/{Color.RST}") + return 1 + + if not workflows: + print(f"{Color.YLW}No workflow files found{Color.RST}") + return 0 + + # Get available runner labels if checking runners + available_labels: set = set() + if check_runners: + owner = getattr(args, "owner", None) or api.config.default_owner + repo = getattr(args, "repo", None) or api.config.default_repo + runners = api.get_runners(owner or "", repo or "") + for r in runners: + for label in r.get("labels", []): + available_labels.add(label) + + total_errors = 0 + total_warnings = 0 + + for wf_path in sorted(workflows): + content = wf_path.read_text() + errors, warnings = _validate_workflow(content, wf_path.name, available_labels) + + if errors or warnings: + print(f"{Color.BLD}{wf_path}{Color.RST}") + for level, line, msg in errors: + print(f" {Color.RED}error:{Color.RST} {msg}" + (f" (line {line})" if line else "")) + total_errors += 1 + for level, line, msg in warnings: + print(f" {Color.YLW}warning:{Color.RST} {msg}" + (f" (line {line})" if line else "")) + total_warnings += 1 + print() + else: + print(f"{Color.GRN}✓{Color.RST} {wf_path}") + + # Summary + print() + if total_errors > 0: + print(f"{Color.RED}{total_errors} error(s){Color.RST}, {total_warnings} warning(s)") + return 1 + elif total_warnings > 0 and strict: + print(f"{Color.YLW}{total_warnings} warning(s) (strict mode){Color.RST}") + return 1 + elif total_warnings > 0: + print(f"{Color.GRN}OK{Color.RST} with {total_warnings} warning(s)") + return 0 + else: + print(f"{Color.GRN}All {len(workflows)} workflow(s) valid{Color.RST}") + return 0 + + +def _validate_workflow(content: str, filename: str, available_labels: set) -> tuple: + """Validate workflow content without external YAML parser. + + Returns (errors, warnings) where each is a list of (level, line, message). + """ + errors: list[tuple] = [] + warnings: list[tuple] = [] + lines = content.split("\n") + + # Check for required top-level keys + has_name = False + has_on = False + has_jobs = False + + for i, line in enumerate(lines, 1): + stripped = line.strip() + if stripped.startswith("#"): + continue + if re.match(r"^name\s*:", line): + has_name = True + if re.match(r"^on\s*:", line): + has_on = True + if re.match(r"^jobs\s*:", line): + has_jobs = True + + if not has_name: + warnings.append(("warning", 0, "Missing 'name:' field (recommended)")) + if not has_on: + errors.append(("error", 0, "Missing 'on:' trigger definition")) + if not has_jobs: + errors.append(("error", 0, "Missing 'jobs:' section")) + + # Check runs-on labels + runs_on_matches = re.finditer(r"runs-on\s*:\s*(.+)", content) + for match in runs_on_matches: + label = match.group(1).strip().strip("'\"") + # Find line number + pos = match.start() + line_num = content[:pos].count("\n") + 1 + + # Check if label is available (if we have runner info) + if available_labels and label not in available_labels: + warnings.append(("warning", line_num, f"runs-on label '{label}' not found in available runners")) + + # Check for common issues + for i, line in enumerate(lines, 1): + stripped = line.strip() + + # Check for tabs (YAML doesn't like tabs) + if "\t" in line and not stripped.startswith("#"): + errors.append(("error", i, "Tab character found (use spaces for indentation)")) + + # Check for trailing colons without values (common mistake) + if re.match(r"^\s+-\s*$", line): + errors.append(("error", i, "Empty list item")) + + # Check for uses without version + uses_match = re.match(r".*uses\s*:\s*([^@\s]+)$", stripped) + if uses_match and not stripped.startswith("#"): + action = uses_match.group(1) + if "/" in action and "@" not in action: + warnings.append(("warning", i, f"Action '{action}' should specify version (e.g., @v1)")) + + return errors, warnings + + +def cmd_stats(api: GiteaAPI, args: argparse.Namespace) -> int: + """Show comprehensive workflow statistics with visual graphs.""" + owner = args.owner or api.config.default_owner + repo = args.repo or api.config.default_repo + + if not owner or not repo: + print(f"{Color.BLD}Error:{Color.RST} Repository required") + return 1 + + # Fetch more runs for better statistics + limit = getattr(args, "limit", 50) or 50 + runs = api.get_runs(owner, repo, limit=limit) + + if not runs: + print(f"{Color.DIM}No workflow runs found{Color.RST}") + return 0 + + # JSON output + if getattr(args, "json", False): + return _output_stats_json(runs, owner, repo) + + print(f"{Color.BLD}Workflow Statistics: {owner}/{repo}{Color.RST}") + print("═" * 60) + + # Calculate statistics + total = len(runs) + success = [r for r in runs if r.get("conclusion") == "success"] + failed = [r for r in runs if r.get("conclusion") == "failure"] + cancelled = [r for r in runs if r.get("conclusion") == "cancelled"] + running = [r for r in runs if r.get("status") in ("running", "waiting")] + completed = total - len(running) + + # Summary section + print(f"{Color.BLD}Summary{Color.RST} {Color.DIM}(last {total} runs){Color.RST}") + print("─" * 40) + success_rate = (len(success) / completed * 100) if completed > 0 else 0 + print(f" {'Total:':<12} {total}") + print(f" {'Completed:':<12} {completed}") + print(f" {'✓ Success:':<12} {len(success)}") + print(f" {'✗ Failed:':<12} {len(failed)}") + print(f" {'○ Cancelled:':<12} {len(cancelled)}") + print(f" {'● Running:':<12} {len(running)}") + print() + + # Success rate bar + print(f"{Color.BLD}Success Rate{Color.RST}") + print("─" * 40) + _print_rate_bar("Overall", success_rate, 30) + print() + + # Per-workflow breakdown + workflows: dict[str, dict] = {} + for run in runs: + wf = run.get("workflow") or run.get("name") or "unknown" + if wf not in workflows: + workflows[wf] = {"total": 0, "success": 0, "failed": 0, "durations": []} + workflows[wf]["total"] += 1 + if run.get("conclusion") == "success": + workflows[wf]["success"] += 1 + elif run.get("conclusion") == "failure": + workflows[wf]["failed"] += 1 + # Track duration + duration = _parse_duration_seconds(run) + if duration > 0: + workflows[wf]["durations"].append(duration) + + if len(workflows) > 1: + print(f"{Color.BLD}Per-Workflow Breakdown{Color.RST}") + print("─" * 40) + for wf_name, stats in sorted(workflows.items()): + wf_total = stats["total"] + wf_success = stats["success"] + wf_rate = (wf_success / wf_total * 100) if wf_total > 0 else 0 + # Truncate workflow name + display_name = wf_name[:20] if len(wf_name) > 20 else wf_name + _print_rate_bar(display_name, wf_rate, 20, show_count=f"{wf_success}/{wf_total}") + print() + + # Duration statistics + all_durations = [] + for run in runs: + duration = _parse_duration_seconds(run) + if duration > 0: + all_durations.append(duration) + + if all_durations: + print(f"{Color.BLD}Duration Statistics{Color.RST}") + print("─" * 40) + avg_duration = sum(all_durations) / len(all_durations) + min_duration = min(all_durations) + max_duration = max(all_durations) + print(f" {'Average:':<12} {_format_seconds(int(avg_duration))}") + print(f" {'Fastest:':<12} {_format_seconds(min_duration)}") + print(f" {'Slowest:':<12} {_format_seconds(max_duration)}") + print() + + # Recent trend (last 10 runs) + recent = runs[:min(10, len(runs))] + if recent: + print(f"{Color.BLD}Recent Trend{Color.RST} {Color.DIM}(last {len(recent)} runs){Color.RST}") + print("─" * 40) + trend_line = " " + for run in reversed(recent): # Oldest to newest + conclusion = run.get("conclusion", "") + if conclusion == "success": + trend_line += "█" # Full block for success + elif conclusion == "failure": + trend_line += "▁" # Low block for failure (bar chart effect) + elif run.get("status") in ("running", "waiting"): + trend_line += "▒" # Medium shade for running + else: + trend_line += "░" # Light shade for other + print(trend_line) + print(f" {Color.DIM}█=pass ▁=fail ▒=running ░=other{Color.RST}") + print(f" {Color.DIM}← older newer →{Color.RST}") + print() + + # Job-level statistics (if detailed flag) + if getattr(args, "detailed", False): + _print_job_stats(api, runs[:20], owner, repo) + + return 0 + + +def _print_rate_bar(label: str, rate: float, width: int = 20, show_count: str = "") -> None: + """Print a labeled progress bar for rates. No colors per STYLING.md.""" + filled = int(rate / 100 * width) + empty = width - filled + + # Use block characters only - no colors + bar = f"{'█' * filled}{Color.DIM}{'░' * empty}{Color.RST}" + count_str = f" {Color.DIM}{show_count}{Color.RST}" if show_count else "" + print(f" {label:<20} {bar} {rate:5.1f}%{count_str}") + + +def _parse_duration_seconds(run: dict) -> int: + """Parse run duration to seconds.""" + # Try direct duration field + duration_str = run.get("duration", "") + if duration_str: + # Parse formats like "1m30s", "45s", "2m" + total = 0 + import re as re_mod + minutes = re_mod.search(r"(\d+)m", duration_str) + seconds = re_mod.search(r"(\d+)s", duration_str) + if minutes: + total += int(minutes.group(1)) * 60 + if seconds: + total += int(seconds.group(1)) + return total + + # Calculate from timestamps + started = run.get("run_started_at", "") + ended = run.get("updated_at", "") + if started and ended and run.get("status") == "completed": + try: + from datetime import datetime + start_dt = datetime.fromisoformat(started.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00")) + return int((end_dt - start_dt).total_seconds()) + except (ValueError, TypeError): + pass + return 0 + + +def _format_seconds(seconds: int) -> str: + """Format seconds to human-readable duration.""" + if seconds < 60: + return f"{seconds}s" + minutes = seconds // 60 + secs = seconds % 60 + if minutes < 60: + return f"{minutes}m{secs}s" if secs else f"{minutes}m" + hours = minutes // 60 + mins = minutes % 60 + return f"{hours}h{mins}m" + + +def _print_job_stats(api: GiteaAPI, runs: list, owner: str, repo: str) -> None: + """Print job-level statistics. No colors per STYLING.md.""" + print(f"{Color.BLD}Job Statistics{Color.RST} {Color.DIM}(from {len(runs)} recent runs){Color.RST}") + print("─" * 40) + + job_stats: dict[str, dict] = {} + for run in runs: + if run.get("status") != "completed": + continue + run_id = run.get("id") + jobs = api.get_jobs(owner, repo, run_id) + for job in jobs: + name = job.get("name", "unknown") + if name not in job_stats: + job_stats[name] = {"success": 0, "failed": 0, "durations": []} + if job.get("conclusion") == "success": + job_stats[name]["success"] += 1 + elif job.get("conclusion") == "failure": + job_stats[name]["failed"] += 1 + # Parse job duration + started = job.get("started_at", "") + ended = job.get("completed_at", "") + if started and ended: + try: + from datetime import datetime + start_dt = datetime.fromisoformat(started.replace("Z", "+00:00")) + end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00")) + duration = int((end_dt - start_dt).total_seconds()) + # Sanity check: duration should be reasonable (< 24 hours) + if 0 < duration < 86400: + job_stats[name]["durations"].append(duration) + except (ValueError, TypeError): + pass + + if not job_stats: + print(f" {Color.DIM}No job data available{Color.RST}") + return + + # Print job table - use symbols, no colors + print(f" {'Job':<30} {'✓ Pass':<8} {'✗ Fail':<8} {'Avg':<8}") + print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*8}") + for job_name, stats in sorted(job_stats.items()): + total = stats["success"] + stats["failed"] + if total == 0: + continue + name_display = job_name[:30] if len(job_name) > 30 else job_name + avg_dur = "" + if stats["durations"]: + avg = sum(stats["durations"]) / len(stats["durations"]) + avg_dur = _format_seconds(int(avg)) + + # No colors - use plain text + fail_str = str(stats["failed"]) if stats["failed"] > 0 else f"{Color.DIM}0{Color.RST}" + print(f" {name_display:<30} {stats['success']:<8} {fail_str:<8} {avg_dur:<8}") + print() + + +def _output_stats_json(runs: list, owner: str, repo: str) -> int: + """Output statistics as JSON.""" + total = len(runs) + success = len([r for r in runs if r.get("conclusion") == "success"]) + failed = len([r for r in runs if r.get("conclusion") == "failure"]) + cancelled = len([r for r in runs if r.get("conclusion") == "cancelled"]) + running = len([r for r in runs if r.get("status") in ("running", "waiting")]) + completed = total - running + + # Per-workflow stats + workflows: dict[str, dict] = {} + for run in runs: + wf = run.get("workflow") or "unknown" + if wf not in workflows: + workflows[wf] = {"total": 0, "success": 0, "failed": 0} + workflows[wf]["total"] += 1 + if run.get("conclusion") == "success": + workflows[wf]["success"] += 1 + elif run.get("conclusion") == "failure": + workflows[wf]["failed"] += 1 + + output = { + "repository": f"{owner}/{repo}", + "total_runs": total, + "completed": completed, + "success": success, + "failed": failed, + "cancelled": cancelled, + "running": running, + "success_rate": round(success / completed * 100, 1) if completed > 0 else 0, + "workflows": workflows, + } + print(json.dumps(output, indent=2)) + return 0 + + +def send_notification(title: str, message: str, urgency: str = "normal") -> None: + """Send desktop notification via notify-send.""" + if shutil.which("notify-send"): + try: + subprocess.run( + ["notify-send", f"--urgency={urgency}", title, message], + capture_output=True, + timeout=5 + ) + except (subprocess.SubprocessError, OSError): + pass + + +def main() -> int: + """Main entry point.""" + # Quick check for direct run URL or instance URL as first arg + if len(sys.argv) > 1 and not sys.argv[1].startswith("-"): + arg = sys.argv[1] + # Check if it looks like a URL or hostname (not a command) + commands = ("list", "ls", "l", "status", "s", "logs", "log", "watch", "w", "config", + "trigger", "t", "rerun", "cancel", "artifacts", "art", "pr") + if "." in arg and arg not in commands: + # Check for direct run URL: git.mymx.me/user/repo/actions/runs/123 + run_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)/actions/runs/(\d+)", arg) + if run_match: + host, owner, repo, run_id = run_match.groups() + url = f"https://{host}" + os.environ["GITEA_URL"] = url + # Rewrite args to: status -o owner -r repo -R run_id + sys.argv = [sys.argv[0], "status", "-o", owner, "-r", repo, "-R", run_id] + else: + # Check for repo URL: git.mymx.me/user/repo/actions + repo_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)(?:/actions)?$", arg) + if repo_match: + host, owner, repo = repo_match.groups() + url = f"https://{host}" + os.environ["GITEA_URL"] = url + sys.argv = [sys.argv[0], "list", f"{owner}/{repo}"] + else: + # Just an instance URL + url = arg if arg.startswith("http") else f"https://{arg}" + os.environ["GITEA_URL"] = url + sys.argv.pop(1) # Remove URL from args + + parser = argparse.ArgumentParser( + description="Monitor and manage Gitea Actions workflows", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Commands: + list List recent workflow runs (default: discover all) + stats Show workflow statistics with visual graphs + status Show detailed run status with jobs + logs Show job logs + watch Watch running workflows in real-time + trigger Trigger a workflow manually (workflow_dispatch) + rerun Re-run a workflow or failed jobs + cancel Cancel a running workflow + delete Delete a workflow run + artifacts List or download build artifacts + pr Show CI status for a pull request + compare Compare two workflow runs (diff jobs) + runners List runners and their status + register-token Get runner registration token + workflows List workflows in a repository + infra Show infrastructure status + config Configure token and defaults + +Environment: + GITEA_URL Gitea instance URL + GITEA_TOKEN API token for authentication (required for write operations) + +Examples: + gitea-ci git.mymx.me Discover all workflows on instance + gitea-ci git.mymx.me/user/repo List runs for specific repo + gitea-ci list user/repo --failed Show only failed runs + gitea-ci list user/repo --json Output as JSON + gitea-ci stats user/repo Show statistics with graphs + gitea-ci stats user/repo --detailed Include per-job breakdown + gitea-ci status user/repo Show latest run status + gitea-ci status user/repo 123 Show specific run details + gitea-ci status user/repo 123 --json Output run details as JSON + gitea-ci watch user/repo Watch latest run + gitea-ci watch user/repo 123 -t 300 Watch specific run with timeout + gitea-ci logs user/repo 123 -j 456 Show job logs + gitea-ci trigger user/repo -w lint.yml Trigger workflow manually + gitea-ci rerun user/repo --failed Re-run only failed jobs + gitea-ci cancel user/repo Cancel latest running workflow + gitea-ci artifacts user/repo -d 42 Download artifact #42 + gitea-ci pr user/repo -p 123 Show CI status for PR #123 + gitea-ci compare user/repo 175 174 Compare two runs + gitea-ci delete user/repo -R 123 Delete workflow run #123 + gitea-ci runners List runners (user-level) + gitea-ci runners -o org List organization runners + gitea-ci runners info 5 Show runner #5 details + gitea-ci runners delete 5 -f Delete runner #5 (no confirmation) + gitea-ci runners jobs Show running/pending jobs + gitea-ci register-token Get user-level registration token + gitea-ci register-token -o org -r repo Get repo-level registration token + gitea-ci workflows user/repo -v List workflows with triggers + gitea-ci config --test Test API connectivity + gitea-ci config token Show token info + gitea-ci config token check Check token permissions + gitea-ci config token set Set API token + gitea-ci validate Validate local workflow files + gitea-ci validate --check-runners Cross-reference runs-on labels + gitea-ci validate --strict Treat warnings as errors +""", + ) + + parser.add_argument("-u", "--url", help="Gitea URL") + + subparsers = parser.add_subparsers(dest="command", metavar="command") + + # Common args for repo-based commands + def add_repo_args(p): + p.add_argument("-o", "--owner", help="Repository owner") + p.add_argument("-r", "--repo", help="Repository name") + p.add_argument("repo_path", nargs="?", help="Repository as owner/repo") + + # list + list_p = subparsers.add_parser("list", aliases=["ls", "l"], help="List workflow runs") + add_repo_args(list_p) + list_p.add_argument("-n", "--limit", type=int, default=10, help="Number of runs") + list_p.add_argument("--failed", dest="status_filter", action="store_const", const="failed", help="Show only failed runs") + list_p.add_argument("--success", dest="status_filter", action="store_const", const="success", help="Show only successful runs") + list_p.add_argument("--running", dest="status_filter", action="store_const", const="running", help="Show only running/waiting") + list_p.add_argument("-b", "--branch", help="Filter by branch name") + list_p.add_argument("-w", "--workflow", help="Filter by workflow file") + list_p.add_argument("--json", action="store_true", help="Output as JSON") + list_p.add_argument("--compact", action="store_true", help="Compact single-line output") + + # stats + stats_p = subparsers.add_parser("stats", help="Show workflow statistics with graphs") + add_repo_args(stats_p) + stats_p.add_argument("-n", "--limit", type=int, default=50, help="Number of runs to analyze") + stats_p.add_argument("--detailed", action="store_true", help="Include per-job statistics") + stats_p.add_argument("--json", action="store_true", help="Output as JSON") + + # status (supports repo_path and/or run_id) + status_p = subparsers.add_parser("status", aliases=["s"], help="Show run status") + status_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") + status_p.add_argument("-o", "--owner", help="Repository owner") + status_p.add_argument("-r", "--repo", help="Repository name") + status_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID") + status_p.add_argument("--json", action="store_true", help="Output as JSON") + + # logs (supports repo_path and/or run_id) + logs_p = subparsers.add_parser("logs", aliases=["log"], help="Show job logs") + logs_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") + logs_p.add_argument("-o", "--owner", help="Repository owner") + logs_p.add_argument("-r", "--repo", help="Repository name") + logs_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID") + logs_p.add_argument("-j", "--job", dest="job_id", type=int, help="Job ID") + + # watch (supports repo_path and/or run_id) + watch_p = subparsers.add_parser("watch", aliases=["w"], help="Watch runs") + watch_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") + watch_p.add_argument("-o", "--owner", help="Repository owner") + watch_p.add_argument("-r", "--repo", help="Repository name") + watch_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID to watch") + watch_p.add_argument("-i", "--interval", type=int, default=5, help="Refresh interval (seconds)") + watch_p.add_argument("-t", "--timeout", type=int, default=0, help="Auto-exit after N seconds (0=disabled)") + watch_p.add_argument("--no-clear", action="store_true", help="Don't clear screen") + watch_p.add_argument("--notify", action="store_true", help="Desktop notification on completion") + + # config + config_p = subparsers.add_parser("config", help="Configure gitea-ci") + config_p.add_argument("action", nargs="?", choices=["token"], help="Subcommand") + config_p.add_argument("token_action", nargs="?", choices=["check", "set"], help="Token action") + config_p.add_argument("token_value", nargs="?", help="Token value (for 'set')") + config_p.add_argument("--show", action="store_true", help="Show current config") + config_p.add_argument("--test", action="store_true", help="Test API connectivity") + + # repo + repo_p = subparsers.add_parser("repo", help="Manage repositories") + repo_p.add_argument("action", nargs="?", choices=["list", "create", "exists", "check"], default="list", + help="Action: list, create, exists, check") + repo_p.add_argument("name", nargs="?", help="Repository name (for create/exists)") + repo_p.add_argument("-d", "--description", help="Repository description (for create)") + repo_p.add_argument("-p", "--private", action="store_true", help="Make repository private") + add_repo_args(repo_p) + + # trigger + trigger_p = subparsers.add_parser("trigger", aliases=["t"], help="Trigger workflow") + add_repo_args(trigger_p) + trigger_p.add_argument("-w", "--workflow", help="Workflow file (e.g., lint.yml)") + trigger_p.add_argument("-b", "--ref", help="Branch/tag/SHA to run on (default: master)") + trigger_p.add_argument("-i", "--input", action="append", help="Input as key=value (repeatable)") + + # rerun + rerun_p = subparsers.add_parser("rerun", help="Re-run workflow/jobs") + add_repo_args(rerun_p) + rerun_p.add_argument("-R", "--run-id", type=int, help="Run ID to re-run") + rerun_p.add_argument("-j", "--job-id", type=int, help="Specific job ID to re-run") + rerun_p.add_argument("-f", "--failed-only", action="store_true", help="Re-run only failed jobs") + + # cancel + cancel_p = subparsers.add_parser("cancel", help="Cancel running workflow") + add_repo_args(cancel_p) + cancel_p.add_argument("-R", "--run-id", type=int, help="Run ID to cancel") + + # delete + delete_p = subparsers.add_parser("delete", aliases=["rm"], help="Delete a workflow run") + add_repo_args(delete_p) + delete_p.add_argument("-R", "--run-id", type=int, required=True, help="Run ID to delete") + delete_p.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + + # artifacts + artifacts_p = subparsers.add_parser("artifacts", aliases=["art"], help="List/download artifacts") + add_repo_args(artifacts_p) + artifacts_p.add_argument("-R", "--run-id", type=int, help="Filter by run ID") + artifacts_p.add_argument("-d", "--download", type=int, metavar="ID", help="Download artifact by ID") + artifacts_p.add_argument("-O", "--output", help="Output filename for download") + + # pr + pr_p = subparsers.add_parser("pr", help="Show PR CI status") + add_repo_args(pr_p) + pr_p.add_argument("-p", "--pr-number", type=int, help="Pull request number") + + # compare + compare_p = subparsers.add_parser("compare", aliases=["diff"], help="Compare two runs") + add_repo_args(compare_p) + compare_p.add_argument("run1", type=int, help="First run ID") + compare_p.add_argument("run2", type=int, nargs="?", help="Second run ID (default: previous run)") + compare_p.add_argument("--json", action="store_true", help="Output as JSON") + + # infra + infra_p = subparsers.add_parser("infra", help="Show infrastructure status") + add_repo_args(infra_p) + infra_p.add_argument("--jobs", action="store_true", help="Show pending/running jobs") + + # runners + runners_p = subparsers.add_parser("runners", help="Manage runners") + runners_p.add_argument("action", nargs="?", default="list", + choices=["list", "delete", "info", "jobs"], + help="Action: list, delete, info, jobs") + runners_p.add_argument("runner_id", nargs="?", type=int, help="Runner ID (for delete/info)") + add_repo_args(runners_p) + runners_p.add_argument("--force", "-f", action="store_true", help="Skip confirmation for delete") + runners_p.add_argument("--json", action="store_true", help="Output as JSON") + + # register-token + regtoken_p = subparsers.add_parser("register-token", aliases=["regtoken"], help="Get runner registration token") + add_repo_args(regtoken_p) + regtoken_p.add_argument("--user", action="store_true", help="Get user-level token (ignores repo defaults)") + + # workflows + workflows_p = subparsers.add_parser("workflows", aliases=["wf"], help="List workflows") + add_repo_args(workflows_p) + workflows_p.add_argument("-v", "--verbose", action="store_true", help="Show workflow triggers and details") + + # validate + validate_p = subparsers.add_parser("validate", aliases=["lint"], help="Validate local workflow files") + validate_p.add_argument("path", nargs="?", help="Path to workflow dir or file") + add_repo_args(validate_p) + validate_p.add_argument("--check-runners", action="store_true", help="Cross-reference runs-on with available runners") + validate_p.add_argument("--strict", action="store_true", help="Treat warnings as errors") + + args = parser.parse_args() + + # Load config + config = Config.load() + if args.url: + config.url = args.url + + api = GiteaAPI(config) + + # Set defaults for optional args + if not hasattr(args, "owner"): + args.owner = None + if not hasattr(args, "repo"): + args.repo = None + if not hasattr(args, "limit"): + args.limit = 10 + + # Parse repo_path (owner/repo format) + if hasattr(args, "repo_path") and args.repo_path and "/" in args.repo_path: + parts = args.repo_path.split("/", 1) + if not args.owner: + args.owner = parts[0] + if not args.repo: + args.repo = parts[1] + + # Parse positional args for status/logs/watch: [owner/repo] [run_id] + if hasattr(args, "positional") and args.positional: + for pos in args.positional: + if "/" in pos: + # Looks like owner/repo + parts = pos.split("/", 1) + if not args.owner: + args.owner = parts[0] + if not args.repo: + args.repo = parts[1] + elif pos.isdigit(): + # Looks like run_id + if not args.run_id: + args.run_id = int(pos) + + # Dispatch commands + if args.command in (None, "list", "ls", "l"): + return cmd_list(api, args) + elif args.command == "stats": + return cmd_stats(api, args) + elif args.command in ("status", "s"): + return cmd_status(api, args) + elif args.command in ("logs", "log"): + return cmd_logs(api, args) + elif args.command in ("watch", "w"): + return cmd_watch(api, args) + elif args.command == "config": + return cmd_config(api, args) + elif args.command == "repo": + return cmd_repo(api, args) + elif args.command in ("trigger", "t"): + return cmd_trigger(api, args) + elif args.command == "rerun": + return cmd_rerun(api, args) + elif args.command == "cancel": + return cmd_cancel(api, args) + elif args.command in ("delete", "rm"): + return cmd_delete(api, args) + elif args.command in ("artifacts", "art"): + return cmd_artifacts(api, args) + elif args.command == "pr": + return cmd_pr(api, args) + elif args.command in ("compare", "diff"): + return cmd_compare(api, args) + elif args.command == "infra": + return cmd_infra(api, args) + elif args.command == "runners": + return cmd_runners(api, args) + elif args.command in ("register-token", "regtoken"): + return cmd_register_token(api, args) + elif args.command in ("workflows", "wf"): + return cmd_workflows(api, args) + elif args.command in ("validate", "lint"): + return cmd_validate(api, args) + + return 0 + + +if __name__ == "__main__": + sys.exit(main())