#!/usr/bin/env python3 """Gitea CI - Monitor and manage Gitea Actions workflows. Features: - List recent workflow runs across repositories - Show detailed job status with live updates - Stream build logs in real-time - Trigger workflows manually (workflow_dispatch) - Re-run failed jobs, cancel running workflows - List and download build artifacts - View PR check status, compare workflow runs - Desktop notifications on completion - JSON output mode for scripting """ import argparse import json import os import re import shutil import subprocess import sys import time import urllib.request import urllib.error from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any # Add lib to path for style import sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "lib")) from style import Color, Diff # noqa: E402 # Configuration CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "gitea-ci" CONFIG_FILE = CONFIG_DIR / "config.json" DEFAULT_INSTANCE = os.environ.get("GITEA_URL", "https://git.mymx.me") # Status colors and symbols STATUS_STYLE = { "success": (Color.GRN, "✓"), "failure": (Color.RED, "✗"), "cancelled": (Color.YLW, "⊘"), "running": (Color.BLU, "●"), "waiting": (Color.DIM, "○"), "queued": (Color.DIM, "◌"), "pending": (Color.DIM, "○"), "skipped": (Color.DIM, "⊘"), } @dataclass class Config: """Gitea CI configuration.""" url: str = DEFAULT_INSTANCE token: str = "" default_owner: str = "" default_repo: str = "" @classmethod def load(cls) -> "Config": """Load config from file or environment.""" config = cls() # Environment overrides config.url = os.environ.get("GITEA_URL", config.url) config.token = os.environ.get("GITEA_TOKEN", "") # Load from file if CONFIG_FILE.exists(): try: data = json.loads(CONFIG_FILE.read_text()) config.url = data.get("url", config.url) config.token = data.get("token", config.token) config.default_owner = data.get("default_owner", "") config.default_repo = data.get("default_repo", "") except (json.JSONDecodeError, IOError): pass return config def save(self) -> None: """Save config to file.""" CONFIG_DIR.mkdir(parents=True, exist_ok=True) data = { "url": self.url, "token": self.token, "default_owner": self.default_owner, "default_repo": self.default_repo, } CONFIG_FILE.write_text(json.dumps(data, indent=2) + "\n") CONFIG_FILE.chmod(0o600) class GiteaAPI: """Gitea API client with rate limiting support.""" def __init__(self, config: Config): self.config = config self.base_url = config.url.rstrip("/") + "/api/v1" self._retry_delay = 1 # Initial retry delay in seconds self._max_retry_delay = 60 # Max delay self._rate_limited = False def _request(self, endpoint: str, method: str = "GET", retries: int = 3) -> dict | None: """Make API request with rate limit handling.""" url = f"{self.base_url}{endpoint}" headers = {"Accept": "application/json"} if self.config.token: headers["Authorization"] = f"token {self.config.token}" req = urllib.request.Request(url, headers=headers, method=method) for attempt in range(retries): try: with urllib.request.urlopen(req, timeout=30) as resp: # Reset retry delay on success self._retry_delay = 1 self._rate_limited = False return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: if e.code == 401: print(f"{Color.RED}Authentication required. Set GITEA_TOKEN or run: gitea-ci config{Color.RST}") return None elif e.code == 404: return None elif e.code == 429 or e.code == 403: # Rate limited - exponential backoff self._rate_limited = True retry_after = e.headers.get("Retry-After") if retry_after and retry_after.isdigit(): delay = int(retry_after) else: delay = self._retry_delay if attempt < retries - 1: print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr) time.sleep(delay) # Exponential backoff with max self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue else: print(f"{Color.RED}Rate limited. Try again later.{Color.RST}") return None elif e.code >= 500: # Server error - retry with backoff if attempt < retries - 1: delay = self._retry_delay print(f"{Color.YLW}Server error {e.code}. Retrying in {delay}s...{Color.RST}", file=sys.stderr) time.sleep(delay) self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue else: print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") return None else: print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") return None except urllib.error.URLError as e: if attempt < retries - 1: delay = self._retry_delay print(f"{Color.YLW}Connection error. Retrying in {delay}s...{Color.RST}", file=sys.stderr) time.sleep(delay) self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue else: print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") return None return None def _request_text(self, endpoint: str, retries: int = 3) -> str | None: """Make API request returning text with rate limit handling.""" url = f"{self.base_url}{endpoint}" headers = {} if self.config.token: headers["Authorization"] = f"token {self.config.token}" req = urllib.request.Request(url, headers=headers) for attempt in range(retries): try: with urllib.request.urlopen(req, timeout=30) as resp: self._retry_delay = 1 return resp.read().decode() except urllib.error.HTTPError as e: if e.code == 429 or e.code == 403: retry_after = e.headers.get("Retry-After") delay = int(retry_after) if retry_after and retry_after.isdigit() else self._retry_delay if attempt < retries - 1: print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr) time.sleep(delay) self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue return None except urllib.error.URLError: if attempt < retries - 1: time.sleep(self._retry_delay) self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue return None return None def throttle(self) -> None: """Add small delay between requests to avoid rate limiting.""" if self._rate_limited: time.sleep(0.5) else: time.sleep(0.1) # Small delay to be nice to the server def get_repos(self, limit: int = 20) -> list[Any]: """Get repositories for current user.""" data = self._request(f"/user/repos?limit={limit}") return list(data) if data else [] def create_repo(self, name: str, description: str = "", private: bool = False, auto_init: bool = True) -> dict | None: """Create a new repository.""" data = { "name": name, "description": description, "private": private, "auto_init": auto_init, } return self._post("/user/repos", data) def get_repo(self, owner: str, repo: str) -> dict | None: """Get repository details.""" return self._request(f"/repos/{owner}/{repo}") def repo_exists(self, owner: str, repo: str) -> bool: """Check if a repository exists.""" return self.get_repo(owner, repo) is not None def get_runs(self, owner: str, repo: str, limit: int = 10) -> list: """Get workflow runs for a repository.""" data = self._request(f"/repos/{owner}/{repo}/actions/runs?limit={limit}") if data and "workflow_runs" in data: return data["workflow_runs"] return data if isinstance(data, list) else [] def get_run(self, owner: str, repo: str, run_id: int) -> dict | None: """Get specific workflow run.""" return self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}") def get_jobs(self, owner: str, repo: str, run_id: int) -> list: """Get jobs for a workflow run.""" data = self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs") if data and "jobs" in data: return data["jobs"] return data if isinstance(data, list) else [] def get_job_logs(self, owner: str, repo: str, job_id: int) -> str | None: """Get logs for a job.""" return self._request_text(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") def trigger_workflow(self, owner: str, repo: str, workflow: str, ref: str = "master", inputs: dict | None = None) -> bool: """Trigger a workflow_dispatch event.""" endpoint = f"/repos/{owner}/{repo}/actions/workflows/{workflow}/dispatches" data: dict[str, Any] = {"ref": ref} if inputs: data["inputs"] = inputs return self._post(endpoint, data) is not None def rerun_job(self, owner: str, repo: str, job_id: int) -> bool: """Re-run a specific job.""" return self._post(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/rerun", {}) is not None def rerun_workflow(self, owner: str, repo: str, run_id: int) -> bool: """Re-run all jobs in a workflow run.""" return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun", {}) is not None def rerun_failed_jobs(self, owner: str, repo: str, run_id: int) -> bool: """Re-run only failed jobs in a workflow run.""" return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun-failed-jobs", {}) is not None def cancel_run(self, owner: str, repo: str, run_id: int) -> bool: """Cancel a workflow run.""" return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/cancel", {}) is not None def get_artifacts(self, owner: str, repo: str, run_id: int | None = None) -> list: """Get artifacts for a repository or specific run.""" if run_id: endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}/artifacts" else: endpoint = f"/repos/{owner}/{repo}/actions/artifacts" data = self._request(endpoint) if data and "artifacts" in data: return data["artifacts"] return data if isinstance(data, list) else [] def download_artifact(self, owner: str, repo: str, artifact_id: int, output_path: str) -> bool: """Download an artifact to a file.""" endpoint = f"/repos/{owner}/{repo}/actions/artifacts/{artifact_id}/zip" url = f"{self.base_url}{endpoint}" headers = {} if self.config.token: headers["Authorization"] = f"token {self.config.token}" req = urllib.request.Request(url, headers=headers) try: with urllib.request.urlopen(req, timeout=120) as resp: with open(output_path, "wb") as f: f.write(resp.read()) return True except (urllib.error.URLError, IOError) as e: print(f"{Color.RED}Download failed: {e}{Color.RST}") return False def get_workflows(self, owner: str, repo: str) -> list: """Get list of workflows in a repository.""" data = self._request(f"/repos/{owner}/{repo}/actions/workflows") if data and "workflows" in data: return data["workflows"] return data if isinstance(data, list) else [] def get_pr_checks(self, owner: str, repo: str, pr_number: int) -> dict | None: """Get CI check status for a pull request.""" # Get PR details first to find head SHA pr = self._request(f"/repos/{owner}/{repo}/pulls/{pr_number}") if not pr: return None head_sha = pr.get("head", {}).get("sha", "") if not head_sha: return None # Get commit status status = self._request(f"/repos/{owner}/{repo}/commits/{head_sha}/status") return { "pr": pr, "sha": head_sha, "status": status, } def get_version(self) -> dict | None: """Get Gitea server version info.""" return self._request("/version") def get_registration_token(self, owner: str = "", repo: str = "") -> str | None: """Get runner registration token for repo, org, or user level. Args: owner: Repository owner or organization (empty for user-level) repo: Repository name (empty for org/user-level) Returns: Registration token string or None on failure. """ if owner and repo: # Repository-level registration endpoint = f"/repos/{owner}/{repo}/actions/runners/registration-token" elif owner: # Organization-level registration endpoint = f"/orgs/{owner}/actions/runners/registration-token" else: # User-level registration endpoint = "/user/actions/runners/registration-token" result = self._post(endpoint, {}) if result and "token" in result: return result["token"] return None def get_runners(self, owner: str = "", repo: str = "") -> list: """Get runners for repo, org, or global (admin). Args: owner: Repository owner or organization repo: Repository name (if empty, gets org/global runners) """ if owner and repo: # Repository runners data = self._request(f"/repos/{owner}/{repo}/actions/runners") elif owner: # Organization runners data = self._request(f"/orgs/{owner}/actions/runners") else: # Global runners (admin only) data = self._request("/admin/runners") if data and "runners" in data: return data["runners"] return data if isinstance(data, list) else [] def get_runner_jobs(self, owner: str = "", repo: str = "") -> list: """Get running/pending jobs for runners.""" if owner and repo: endpoint = f"/repos/{owner}/{repo}/actions/runners/jobs" elif owner: endpoint = f"/orgs/{owner}/actions/runners/jobs" else: endpoint = "/admin/runners/jobs" data = self._request(endpoint) if data and "jobs" in data: return data["jobs"] return data if isinstance(data, list) else [] def get_runner(self, runner_id: int, owner: str = "", repo: str = "") -> dict | None: """Get a specific runner by ID.""" if owner and repo: endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}" elif owner: endpoint = f"/orgs/{owner}/actions/runners/{runner_id}" else: endpoint = f"/admin/runners/{runner_id}" return self._request(endpoint) def delete_runner(self, runner_id: int, owner: str = "", repo: str = "") -> bool: """Delete a runner by ID.""" if owner and repo: endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}" elif owner: endpoint = f"/orgs/{owner}/actions/runners/{runner_id}" else: endpoint = f"/admin/runners/{runner_id}" return self._delete(endpoint) def get_workflow_contents(self, owner: str, repo: str, workflow: str) -> str | None: """Get workflow file contents.""" # Try .gitea/workflows first, then .github/workflows for prefix in [".gitea/workflows", ".github/workflows"]: path = f"{prefix}/{workflow}" data = self._request(f"/repos/{owner}/{repo}/contents/{path}") if data and "content" in data: import base64 try: return base64.b64decode(data["content"]).decode("utf-8") except Exception: pass return None def delete_run(self, owner: str, repo: str, run_id: int) -> bool: """Delete a workflow run (for queued runs that can't be cancelled).""" endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}" return self._delete(endpoint) def _delete(self, endpoint: str) -> bool: """Make DELETE request.""" url = f"{self.base_url}{endpoint}" headers = {"Accept": "application/json"} if self.config.token: headers["Authorization"] = f"token {self.config.token}" else: print(f"{Color.RED}Authentication required for this operation{Color.RST}") return False req = urllib.request.Request(url, headers=headers, method="DELETE") try: with urllib.request.urlopen(req, timeout=30) as resp: return resp.status in (200, 204) except urllib.error.HTTPError as e: if e.code == 404: print(f"{Color.RED}Not found: {endpoint}{Color.RST}") elif e.code == 403: print(f"{Color.RED}Permission denied{Color.RST}") else: print(f"{Color.RED}Delete failed: {e.code}{Color.RST}") return False except urllib.error.URLError as e: print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") return False def _post(self, endpoint: str, data: dict, retries: int = 3) -> dict | None: """Make POST request with JSON body.""" url = f"{self.base_url}{endpoint}" headers = { "Accept": "application/json", "Content-Type": "application/json", } if self.config.token: headers["Authorization"] = f"token {self.config.token}" else: print(f"{Color.RED}Authentication required for this operation{Color.RST}") return None body = json.dumps(data).encode("utf-8") req = urllib.request.Request(url, data=body, headers=headers, method="POST") for attempt in range(retries): try: with urllib.request.urlopen(req, timeout=30) as resp: self._retry_delay = 1 # Handle 204 No Content if resp.status == 204: return {} content = resp.read().decode() return json.loads(content) if content else {} except urllib.error.HTTPError as e: if e.code == 401: print(f"{Color.RED}Authentication failed. Check your token.{Color.RST}") return None elif e.code == 403: print(f"{Color.RED}Permission denied. Token may lack required scopes.{Color.RST}") return None elif e.code == 404: print(f"{Color.RED}Not found: {endpoint}{Color.RST}") return None elif e.code == 422: print(f"{Color.RED}Invalid request (check workflow name/inputs){Color.RST}") return None elif e.code == 429: if attempt < retries - 1: delay = self._retry_delay print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}") time.sleep(delay) self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay) continue return None else: print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}") return None except urllib.error.URLError as e: if attempt < retries - 1: time.sleep(self._retry_delay) continue print(f"{Color.RED}Connection error: {e.reason}{Color.RST}") return None return None def format_time(timestamp: str) -> str: """Format ISO timestamp to relative time.""" if not timestamp: return "" try: dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) now = datetime.now(dt.tzinfo) delta = now - dt if delta.days > 0: return f"{delta.days}d ago" elif delta.seconds >= 3600: return f"{delta.seconds // 3600}h ago" elif delta.seconds >= 60: return f"{delta.seconds // 60}m ago" else: return "just now" except (ValueError, TypeError): return timestamp[:16] if len(timestamp) > 16 else timestamp def format_duration(start: str, end: str) -> str: """Format duration between two timestamps.""" if not start: return "" try: start_dt = datetime.fromisoformat(start.replace("Z", "+00:00")) if end: end_dt = datetime.fromisoformat(end.replace("Z", "+00:00")) else: end_dt = datetime.now(start_dt.tzinfo) delta = end_dt - start_dt total_secs = int(delta.total_seconds()) if total_secs >= 3600: return f"{total_secs // 3600}h{(total_secs % 3600) // 60:02d}m" elif total_secs >= 60: return f"{total_secs // 60}m{total_secs % 60:02d}s" else: return f"{total_secs}s" except (ValueError, TypeError): return "" def status_display(status: str, conclusion: str = "") -> str: """Get colored status display.""" key = conclusion if conclusion else status color, symbol = STATUS_STYLE.get(key.lower(), (Color.DIM, "?")) return f"{color}{symbol}{Color.RST}" def cmd_list(api: GiteaAPI, args: argparse.Namespace) -> int: """List recent workflow runs.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: # Full discovery mode - list all repos with recent runs return cmd_discover(api, args) # Get runs via API runs = api.get_runs(owner, repo, limit=args.limit) if not runs: if not getattr(args, "compact", False): print(f"{Color.DIM}No workflow runs found{Color.RST}") return 0 # Apply filters status_filter = getattr(args, "status_filter", "") or "" branch_filter = getattr(args, "branch", "") or "" workflow_filter = getattr(args, "workflow", "") or "" if status_filter or branch_filter or workflow_filter: runs = _filter_runs(runs, status_filter, branch_filter, workflow_filter) if not runs: if not getattr(args, "compact", False): filters = [f for f in [status_filter, branch_filter, workflow_filter] if f] print(f"{Color.DIM}No runs matching filters: {', '.join(filters)}{Color.RST}") return 0 # JSON output if hasattr(args, "json") and args.json: return _output_json(runs, owner, repo) # Compact output for scripting if getattr(args, "compact", False): for run in runs[:args.limit]: _print_compact_run(run) return 0 print(f"{Color.BLD}Workflow runs for {owner}/{repo}:{Color.RST}\n") # Statistics _print_repo_stats(runs) for run in runs[:args.limit]: _print_single_run(run) return 0 def _filter_runs(runs: list, status_filter: str = "", branch: str = "", workflow: str = "") -> list: """Filter runs by status, branch, and/or workflow.""" filtered = runs # Status filter if status_filter == "failed": filtered = [r for r in filtered if r.get("conclusion") == "failure"] elif status_filter == "success": filtered = [r for r in filtered if r.get("conclusion") == "success"] elif status_filter == "running": filtered = [r for r in filtered if r.get("status") in ("running", "waiting")] elif status_filter == "completed": filtered = [r for r in filtered if r.get("status") == "completed"] # Branch filter if branch: filtered = [r for r in filtered if branch.lower() in r.get("head_branch", "").lower()] # Workflow filter if workflow: workflow_lower = workflow.lower() filtered = [r for r in filtered if ( workflow_lower in r.get("workflow", "").lower() or workflow_lower in r.get("path", "").lower() or workflow_lower in r.get("name", "").lower() )] return filtered def _print_compact_run(run: dict, show_repo: bool = False) -> None: """Print a single run in compact single-line format.""" run_id = run.get("id", 0) status = run.get("status", "") conclusion = run.get("conclusion", "") # Status symbol if conclusion == "success": sym = f"{Color.GRN}✓{Color.RST}" elif conclusion == "failure": sym = f"{Color.RED}✗{Color.RST}" elif status in ("running", "waiting"): sym = f"{Color.BLU}●{Color.RST}" else: sym = f"{Color.DIM}○{Color.RST}" branch = run.get("head_branch", "")[:20] title = run.get("display_title", "")[:40] workflow = run.get("workflow", "") repo = run.get("_repo", "") duration = run.get("duration", "") or format_duration( run.get("run_started_at", ""), run.get("updated_at", "") ) # Build compact line parts = [sym, f"#{run_id:>5}"] if show_repo and repo: parts.append(f"{Color.BLD}{repo:<20}{Color.RST}") if workflow: parts.append(f"{Color.DIM}{workflow:<15}{Color.RST}") parts.append(f"{Color.MAG}{branch:<20}{Color.RST}") parts.append(title) if duration: parts.append(f"{Color.DIM}({duration}){Color.RST}") print(" ".join(parts)) def _print_repo_stats(runs: list) -> None: """Print statistics for a single repository.""" total = len(runs) if total == 0: return success = len([r for r in runs if r.get("conclusion") == "success"]) failed = len([r for r in runs if r.get("conclusion") == "failure"]) running = len([r for r in runs if r.get("status") in ("running", "waiting")]) completed = total - running if completed > 0: rate = (success / completed) * 100 print(f"{Color.DIM}Stats: {Color.GRN}✓{success}{Color.RST}{Color.DIM} " f"{Color.RED}✗{failed}{Color.RST}{Color.DIM} " f"{Color.BLU}●{running}{Color.RST}{Color.DIM} " f"({rate:.0f}% success){Color.RST}\n") def _print_single_run(run: dict) -> None: """Print a single run with full details.""" run_id = run.get("id", 0) status = status_display(run.get("status", ""), run.get("conclusion", "")) title = run.get("display_title", run.get("head_branch", ""))[:50] branch = run.get("head_branch", "") workflow = run.get("workflow", "") when = format_time(run.get("created_at", "")) duration = run.get("duration", "") or format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) if workflow: print(f" {status} {Color.BLD}{workflow}{Color.RST}") else: print(f" {status} {Color.BLD}#{run_id}{Color.RST}") print(f" {title}") info_parts = [] if branch: info_parts.append(f"{Color.MAG}{branch}{Color.RST}") if when: info_parts.append(when) if duration: info_parts.append(duration) if info_parts: print(f" {Color.DIM}{' · '.join(info_parts)}{Color.RST}") print() def _output_json(runs: list, owner: str, repo: str) -> int: """Output runs as JSON.""" import json as json_mod runs_list: list[dict[str, Any]] = [] output: dict[str, Any] = { "repository": f"{owner}/{repo}", "total": len(runs), "runs": runs_list } for run in runs: runs_list.append({ "id": run.get("id"), "status": run.get("status"), "conclusion": run.get("conclusion"), "title": run.get("display_title"), "branch": run.get("head_branch"), "workflow": run.get("workflow"), "created_at": run.get("created_at"), "duration": run.get("duration"), }) print(json_mod.dumps(output, indent=2)) return 0 def cmd_discover(api: GiteaAPI, args: argparse.Namespace) -> int: """Discover all repositories and their workflow runs.""" print(f"{Color.BLD}Discovering workflows on {api.config.url}...{Color.RST}\n") # Get current user user = api._request("/user") if not user: print(f"{Color.YLW}API token required for discovery.{Color.RST}") print(f"{Color.DIM}Use: gitea-ci config --token YOUR_TOKEN{Color.RST}") print(f"{Color.DIM}Or use gitea-scrape for public repos without auth{Color.RST}") return 1 username = user.get("login", "") print(f"{Color.DIM}User:{Color.RST} {username}\n") # Get all repos (user's own + orgs) repos = api.get_repos(limit=100) # Also get repos from organizations orgs: list[Any] = list(api._request("/user/orgs") or []) for org in orgs: org_repos = api._request(f"/orgs/{org.get('username', '')}/repos?limit=100") if org_repos: repos.extend(org_repos) if not repos: print(f"{Color.DIM}No repositories found{Color.RST}") return 0 # Collect all recent runs all_runs = [] repos_with_actions = 0 for i, r in enumerate(repos): repo_owner = r.get("owner", {}).get("login", "") repo_name = r.get("name", "") full_name = r.get("full_name", f"{repo_owner}/{repo_name}") # Progress indicator for large repos if len(repos) > 10 and (i + 1) % 5 == 0: print(f"{Color.DIM} Scanning {i + 1}/{len(repos)} repositories...{Color.RST}", end="\r", file=sys.stderr) runs = api.get_runs(repo_owner, repo_name, limit=3) if runs: repos_with_actions += 1 for run in runs: run["_repo"] = full_name all_runs.append(run) # Throttle to avoid rate limiting api.throttle() if not all_runs: print(f"{Color.DIM}No workflow runs found across {len(repos)} repositories{Color.RST}") return 0 # Sort by created_at descending all_runs.sort(key=lambda x: x.get("created_at", ""), reverse=True) _print_run_summary(all_runs, repos_with_actions) return 0 def _print_run_summary(all_runs: list, repos_with_actions: int) -> None: """Print summary of workflow runs grouped by status.""" # Calculate statistics total = len(all_runs) success = len([r for r in all_runs if r.get("conclusion") == "success"]) failed = [r for r in all_runs if r.get("conclusion") == "failure"] running = [r for r in all_runs if r.get("status") == "running" or r.get("status") == "waiting"] cancelled = len([r for r in all_runs if r.get("conclusion") == "cancelled"]) # Print statistics header print(f"{Color.BLD}Statistics:{Color.RST}") completed = total - len(running) if completed > 0: success_rate = (success / completed) * 100 print(f" {Color.GRN}✓ {success}{Color.RST} success " f"{Color.RED}✗ {len(failed)}{Color.RST} failed " f"{Color.BLU}● {len(running)}{Color.RST} running " f"{Color.DIM}⊘ {cancelled} cancelled{Color.RST}") print(f" {Color.DIM}Success rate: {success_rate:.0f}% ({success}/{completed} completed){Color.RST}") else: print(f" {Color.BLU}● {len(running)}{Color.RST} running " f"{Color.DIM}No completed runs yet{Color.RST}") print(f" {Color.DIM}Repositories with actions: {repos_with_actions}{Color.RST}") print() # Show running first if running: print(f"{Color.BLU}{Color.BLD}Running/Waiting ({len(running)}):{Color.RST}\n") for run in running[:5]: _print_run_line(run) if len(running) > 5: print(f" {Color.DIM}... and {len(running) - 5} more{Color.RST}\n") # Show recent failures if failed: print(f"{Diff.DEL}{Color.BLD} Recent Failures ({len(failed)}) {Color.RST}\n") for run in failed[:5]: _print_run_line(run) if len(failed) > 5: print(f" {Color.DIM}... and {len(failed) - 5} more{Color.RST}\n") # Show recent activity recent = all_runs[:10] print(f"{Color.BLD}Recent Activity:{Color.RST}\n") for run in recent: _print_run_line(run) def _print_run_line(run: dict) -> None: """Print a single run line with repo info.""" run_id = run.get("id", 0) repo = run.get("_repo", "") status = status_display(run.get("status", ""), run.get("conclusion", "")) title = run.get("display_title", run.get("head_branch", ""))[:40] branch = run.get("head_branch", "") workflow = run.get("workflow", "") # Get time - try different fields when = format_time(run.get("created_at", "")) # Get duration - try pre-computed or calculate duration = run.get("duration", "") if not duration: duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) # Build info line info_parts = [] if branch: info_parts.append(f"{Color.MAG}{branch}{Color.RST}") if when: info_parts.append(when) if duration: info_parts.append(duration) info_str = " · ".join(info_parts) if info_parts else "" # Print with workflow name if available if workflow: print(f" {status} {Color.BLD}{repo}{Color.RST} {Color.DIM}{workflow}{Color.RST}") else: print(f" {status} {Color.BLD}{repo}{Color.RST} #{run_id}") print(f" {title}") if info_str: print(f" {Color.DIM}{info_str}{Color.RST}") print() def cmd_status(api: GiteaAPI, args: argparse.Namespace) -> int: """Show detailed status of a workflow run.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required. Use -o OWNER -r REPO or set defaults{Color.RST}") return 1 # Get run (latest or specific) if args.run_id: run = api.get_run(owner, repo, args.run_id) else: runs = api.get_runs(owner, repo, limit=1) run = runs[0] if runs else None if not run: print(f"{Color.RED}Run not found{Color.RST}") return 1 run_id = run.get("id", 0) # Get jobs jobs = api.get_jobs(owner, repo, run_id) # JSON output if getattr(args, "json", False): output = { "repository": f"{owner}/{repo}", "run": { "id": run_id, "status": run.get("status"), "conclusion": run.get("conclusion"), "title": run.get("display_title"), "branch": run.get("head_branch"), "commit": run.get("head_sha"), "created_at": run.get("created_at"), "started_at": run.get("run_started_at"), "updated_at": run.get("updated_at"), }, "jobs": [ { "id": j.get("id"), "name": j.get("name"), "status": j.get("status"), "conclusion": j.get("conclusion"), "started_at": j.get("started_at"), "completed_at": j.get("completed_at"), } for j in jobs ] if jobs else [] } print(json.dumps(output, indent=2)) return 0 status = status_display(run.get("status", ""), run.get("conclusion", "")) title = run.get("display_title", "") branch = run.get("head_branch", "") commit = run.get("head_sha", "")[:8] when = format_time(run.get("created_at", "")) duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) print(f"{Color.BLD}Run #{run_id}{Color.RST} {status}") print(f" {Color.DIM}Title:{Color.RST} {title}") print(f" {Color.DIM}Branch:{Color.RST} {branch} ({commit})") print(f" {Color.DIM}Started:{Color.RST} {when}") print(f" {Color.DIM}Duration:{Color.RST} {duration}") print() if jobs: print(f"{Color.BLD}Jobs:{Color.RST}\n") pending_jobs = [] for job in jobs: job_id = job.get("id", 0) job_status_raw = job.get("status", "") job_status = status_display(job_status_raw, job.get("conclusion", "")) job_name = job.get("name", "unknown") job_duration = format_duration(job.get("started_at", ""), job.get("completed_at", "")) runs_on = job.get("labels", []) # May be empty, will try workflow print(f" {job_status} {job_name} {Color.DIM}(ID: {job_id}, {job_duration}){Color.RST}") # Collect pending jobs for diagnostics if job_status_raw in ("pending", "waiting", "queued"): pending_jobs.append({ "name": job_name, "id": job_id, "labels": runs_on, "created": job.get("created_at", run.get("created_at", "")), }) # Show pending diagnostics if there are pending jobs if pending_jobs and not getattr(args, "json", False): print(f"\n{Color.BLD}Pending Diagnostics:{Color.RST}\n") # Get available runners runners = api.get_runners("", "") # User-level runners first if not runners: runners = api.get_runners(owner, repo) if not runners: print(f" {Color.RED}✗{Color.RST} No runners available") print(f" {Color.DIM}Register a runner: gitea-ci register-token{Color.RST}") else: online_runners = [r for r in runners if r.get("status") == "online"] busy_runners = [r for r in runners if r.get("busy", False)] runner_labels = set() online_labels = set() for r in runners: labels = r.get("labels", []) runner_labels.update(labels) if r.get("status") == "online": online_labels.update(labels) print(f" Runners: {len(online_runners)} online, {len(busy_runners)} busy, {len(runners)} total") # For each pending job, check why it might be stuck for pj in pending_jobs: created_str = pj.get("created", "") if created_str: try: created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00")) pending_secs = (datetime.now(created_dt.tzinfo) - created_dt).total_seconds() pending_time = f"{int(pending_secs // 60)}m {int(pending_secs % 60)}s" if pending_secs >= 60 else f"{int(pending_secs)}s" except Exception: pending_time = "?" else: pending_time = "?" job_labels = pj.get("labels", []) if not job_labels: # Try to get from workflow - use default label job_labels = ["linux"] # Common default print(f"\n {Color.YLW}○{Color.RST} {pj['name']} {Color.DIM}(pending {pending_time}){Color.RST}") if job_labels: missing = set(job_labels) - runner_labels offline_labels = (set(job_labels) & runner_labels) - online_labels if missing: print(f" {Color.RED}Missing labels:{Color.RST} {', '.join(missing)}") elif offline_labels: print(f" {Color.YLW}Runners offline for:{Color.RST} {', '.join(offline_labels)}") elif busy_runners and len(busy_runners) == len(online_runners): print(f" {Color.YLW}All runners busy{Color.RST}") else: print(f" {Color.DIM}Waiting for runner...{Color.RST}") return 0 def cmd_logs(api: GiteaAPI, args: argparse.Namespace) -> int: """Show logs for a job.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 job_id = args.job_id # Get run_id from argument or latest if args.run_id: run_id = args.run_id else: runs = api.get_runs(owner, repo, limit=1) if not runs: print(f"{Color.RED}No runs found{Color.RST}") return 1 run_id = runs[0].get("id") # If no job_id, get jobs for the run if not job_id: jobs = api.get_jobs(owner, repo, run_id) if not jobs: print(f"{Color.RED}No jobs found{Color.RST}") return 1 # Prefer failed job for job in jobs: if job.get("conclusion") == "failure": job_id = job.get("id") break if not job_id: job_id = jobs[0].get("id") logs = api.get_job_logs(owner, repo, job_id) if logs: print(logs) else: print(f"{Color.RED}No logs available (may require authentication){Color.RST}") return 1 return 0 def cmd_watch(api: GiteaAPI, args: argparse.Namespace) -> int: """Watch a running workflow.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 run_id = args.run_id interval = args.interval notify = getattr(args, "notify", False) timeout = getattr(args, "timeout", 0) start_time = time.time() timeout_msg = f", timeout: {timeout}s" if timeout else "" print(f"{Color.BLD}Watching {owner}/{repo}...{Color.RST} (Ctrl+C to stop{timeout_msg})\n") try: while True: # Check timeout if timeout and (time.time() - start_time) >= timeout: print(f"\n{Color.YLW}Timeout reached ({timeout}s){Color.RST}") return 2 # Clear screen for refresh if not args.no_clear: print("\033[2J\033[H", end="") # Get run status if run_id: run = api.get_run(owner, repo, run_id) runs = [run] if run else [] else: runs = api.get_runs(owner, repo, limit=5) if not runs: print(f"{Color.DIM}No active runs{Color.RST}") else: for run in runs: rid = run.get("id", 0) status = status_display(run.get("status", ""), run.get("conclusion", "")) title = run.get("display_title", "")[:40] duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", "")) print(f"{status} {Color.BLD}#{rid}{Color.RST} {title} {Color.DIM}({duration}){Color.RST}") # Show jobs for this run jobs = api.get_jobs(owner, repo, rid) for job in jobs: job_status = status_display(job.get("status", ""), job.get("conclusion", "")) job_name = job.get("name", "") print(f" {job_status} {job_name}") print() # Stop watching if run is complete if run.get("status") == "completed": conclusion = run.get("conclusion", "") if conclusion == "success": print(f"{Diff.ADD} ✓ Run completed successfully {Color.RST}") if notify: send_notification( "CI Passed", f"{owner}/{repo} #{rid} succeeded", "normal" ) elif conclusion == "failure": print(f"{Diff.DEL} ✗ Run failed {Color.RST}") if notify: send_notification( "CI Failed", f"{owner}/{repo} #{rid} failed", "critical" ) else: print(f"{Diff.CHG} ⚠ Run {conclusion} {Color.RST}") if notify: send_notification( f"CI {conclusion.title()}", f"{owner}/{repo} #{rid} {conclusion}", "normal" ) if run_id: # Only exit if watching specific run return 0 if conclusion == "success" else 1 time.sleep(interval) except KeyboardInterrupt: print(f"\n{Color.DIM}Stopped watching{Color.RST}") return 0 def _check_token_permissions(api: GiteaAPI) -> int: """Check what permissions the current token has.""" config = api.config if not config.token: print(f"{Color.RED}No token configured{Color.RST}") print(f"{Color.DIM}Set with: gitea-ci config token set {Color.RST}") return 1 print(f"{Color.BLD}Token Permissions{Color.RST}") print("─" * 35) # Basic auth - GET /user user = api._request("/user") if user: username = user.get("login", "unknown") print(f" {Color.GRN}✓{Color.RST} Authenticated as: {username}") else: print(f" {Color.RED}✗{Color.RST} Authentication failed") return 1 # Read repos - GET /user/repos repos = api._request("/user/repos?limit=1") if repos is not None: print(f" {Color.GRN}✓{Color.RST} Read repositories") else: print(f" {Color.RED}✗{Color.RST} Read repositories") # Organization access - GET /user/orgs orgs = api._request("/user/orgs") if orgs is not None: org_count = len(orgs) if isinstance(orgs, list) else 0 print(f" {Color.GRN}✓{Color.RST} Organization access ({org_count} orgs)") else: print(f" {Color.RED}✗{Color.RST} Organization access") # Admin access - GET /admin/runners runners = api._request("/admin/runners") if runners is not None: print(f" {Color.GRN}✓{Color.RST} Admin access") else: print(f" {Color.DIM}○{Color.RST} Admin access (not admin or scope missing)") # User-level runners - GET /user/actions/runners/registration-token # This tests if we can manage user-level runners user_runners = api._request("/user/actions/runners") if user_runners is not None: print(f" {Color.GRN}✓{Color.RST} User-level runners") else: print(f" {Color.DIM}○{Color.RST} User-level runners") # Check if we can trigger workflows (requires repo scope) print() print(f"{Color.DIM}Token ends with: ...{config.token[-4:]}{Color.RST}") print(f"{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}") return 0 def cmd_config(api: GiteaAPI, args: argparse.Namespace) -> int: """Configure gitea-ci.""" config = api.config # Handle token subcommand action = getattr(args, "action", None) if action == "token": token_action = getattr(args, "token_action", None) if token_action == "check": return _check_token_permissions(api) elif token_action == "set": new_token = getattr(args, "token_value", None) if not new_token: print(f"{Color.RED}Token value required{Color.RST}") return 1 config.token = new_token config.save() print(f"{Color.GRN}Token saved to {CONFIG_FILE}{Color.RST}") return 0 else: # Just show token info if config.token: print(f"{Color.BLD}Token:{Color.RST} {'*' * 8}...{config.token[-4:]}") print(f"{Color.DIM}Use 'gitea-ci config token check' to test permissions{Color.RST}") else: print(f"{Color.YLW}No token configured{Color.RST}") print(f"{Color.DIM}Set with: gitea-ci config token set {Color.RST}") return 0 if args.show: print(f"{Color.BLD}Current configuration:{Color.RST}") print(f" URL: {config.url}") print(f" Token: {'***' if config.token else '(not set)'}") print(f" Default owner: {config.default_owner or '(not set)'}") print(f" Default repo: {config.default_repo or '(not set)'}") print(f"\n{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}") return 0 if args.test: print(f"{Color.BLD}Testing API connectivity...{Color.RST}\n") print(f" URL: {config.url}") print(f" Token: {'configured' if config.token else 'not set'}") print() # Test basic connectivity try: req = urllib.request.Request( f"{config.url}/api/v1/version", headers={"Accept": "application/json"} ) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read().decode()) version = data.get("version", "unknown") print(f" {Color.GRN}✓{Color.RST} Server reachable (Gitea {version})") except urllib.error.URLError as e: print(f" {Color.RED}✗{Color.RST} Connection failed: {e.reason}") return 1 # Test authentication if token set if config.token: user = api._request("/user") if user: username = user.get("login", "unknown") print(f" {Color.GRN}✓{Color.RST} Authenticated as: {username}") else: print(f" {Color.RED}✗{Color.RST} Authentication failed (invalid token?)") return 1 else: print(f" {Color.YLW}○{Color.RST} No token configured (read-only mode)") # Test default repo if configured if config.default_owner and config.default_repo: runs = api.get_runs(config.default_owner, config.default_repo, limit=1) if runs is not None: print(f" {Color.GRN}✓{Color.RST} Default repo accessible: {config.default_owner}/{config.default_repo}") else: print(f" {Color.YLW}○{Color.RST} Default repo not accessible (may not have actions)") print(f"\n{Color.GRN}API connectivity OK{Color.RST}") return 0 # Interactive config print(f"{Color.BLD}Gitea CI Configuration{Color.RST}\n") url = input(f"Gitea URL [{config.url}]: ").strip() if url: config.url = url token = input(f"API Token [{'***' if config.token else 'none'}]: ").strip() if token: config.token = token owner = input(f"Default owner [{config.default_owner}]: ").strip() if owner: config.default_owner = owner repo = input(f"Default repo [{config.default_repo}]: ").strip() if repo: config.default_repo = repo config.save() print(f"\n{Color.GRN}Configuration saved to {CONFIG_FILE}{Color.RST}") return 0 def cmd_repo(api: GiteaAPI, args: argparse.Namespace) -> int: """Manage repositories.""" action = getattr(args, "action", "list") if action == "list": # List user's repositories print(f"{Color.BLD}Repositories:{Color.RST}\n") repos = api.get_repos(limit=100) if not repos: print(f"{Color.DIM}No repositories found{Color.RST}") return 0 for r in repos: name = r.get("full_name", "") desc = r.get("description", "")[:40] if r.get("description") else "" private = r.get("private", False) visibility = f"{Color.YLW}private{Color.RST}" if private else f"{Color.GRN}public{Color.RST}" has_actions = r.get("has_actions", False) actions_icon = f" {Color.DIM}[actions]{Color.RST}" if has_actions else "" print(f" {name} [{visibility}]{actions_icon}") if desc: print(f" {Color.DIM}{desc}{Color.RST}") return 0 elif action == "create": name = args.name if not name: print(f"{Color.RED}Repository name required{Color.RST}") return 1 description = getattr(args, "description", "") or "" private = getattr(args, "private", False) # Check if already exists user = api._request("/user") if not user: print(f"{Color.RED}Authentication required{Color.RST}") return 1 username = user.get("login", "") if api.repo_exists(username, name): print(f"{Color.YLW}Repository {username}/{name} already exists{Color.RST}") return 0 print(f"Creating repository {Color.BLD}{name}{Color.RST}...") result = api.create_repo(name, description, private) if result: full_name = result.get("full_name", name) clone_url = result.get("ssh_url", result.get("clone_url", "")) print(f"{Color.GRN}✓{Color.RST} Created: {full_name}") if clone_url: print(f" {Color.DIM}Clone: {clone_url}{Color.RST}") return 0 else: print(f"{Color.RED}✗{Color.RST} Failed to create repository") return 1 elif action == "exists": owner = args.owner or api.config.default_owner repo = args.repo or args.name if not owner or not repo: print(f"{Color.RED}Repository (owner/repo) required{Color.RST}") return 1 if api.repo_exists(owner, repo): print(f"{Color.GRN}✓{Color.RST} {owner}/{repo} exists") return 0 else: print(f"{Color.RED}✗{Color.RST} {owner}/{repo} does not exist") return 1 elif action == "check": owner = args.owner or api.config.default_owner repo = args.repo or args.name or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository (owner/repo) required{Color.RST}") return 1 print(f"{Color.BLD}Repository Check: {owner}/{repo}{Color.RST}\n") # Check if repository exists and has actions enabled repo_info = api._request(f"/repos/{owner}/{repo}") if not repo_info: print(f"{Color.RED}✗{Color.RST} Repository not found or not accessible") return 1 has_actions = repo_info.get("has_actions", False) if has_actions: print(f"{Color.GRN}✓{Color.RST} Actions enabled") else: print(f"{Color.RED}✗{Color.RST} Actions not enabled") print(f" {Color.DIM}Enable in repository Settings → Actions{Color.RST}") return 1 # Get workflows workflows = api.get_workflows(owner, repo) if workflows: print(f"{Color.GRN}✓{Color.RST} Found {len(workflows)} workflow(s)") else: print(f"{Color.YLW}○{Color.RST} No workflows found") print(f" {Color.DIM}Create .gitea/workflows/*.yml or .github/workflows/*.yml{Color.RST}") return 0 # Get available runners (user scope first) runners = api.get_runners("", "") # User-level runners if not runners: # Try repo scope runners = api.get_runners(owner, repo) runner_labels = set() online_labels = set() for r in runners: labels = r.get("labels", []) runner_labels.update(labels) if r.get("status") == "online": online_labels.update(labels) if runners: online_count = sum(1 for r in runners if r.get("status") == "online") print(f"{Color.GRN}✓{Color.RST} Runners available: {online_count} online, {len(runners)} total") else: print(f"{Color.RED}✗{Color.RST} No runners found") print(f" {Color.DIM}Register a runner with: gitea-ci register-token{Color.RST}") return 1 # Analyze workflows for runs-on labels print(f"\n{Color.BLD}Workflow Analysis:{Color.RST}\n") issues = [] for wf in workflows: wf_name = wf.get("name", "unnamed") wf_path = wf.get("path", "") wf_file = wf_path.split("/")[-1] if wf_path else "unknown" # Get workflow contents and parse runs-on content = api.get_workflow_contents(owner, repo, wf_file) if not content: print(f" {Color.DIM}○{Color.RST} {wf_name} {Color.DIM}(cannot read){Color.RST}") continue # Parse runs-on labels from YAML content runs_on_matches = re.findall(r'runs-on:\s*([^\n]+)', content) required_labels = set() for match in runs_on_matches: # Handle array syntax: [label1, label2] or simple string match = match.strip() if match.startswith('['): # Array syntax labels = re.findall(r'[\w-]+', match) required_labels.update(labels) else: # Simple string (may have quotes) label = match.strip('\'"') required_labels.add(label) if not required_labels: print(f" {Color.DIM}○{Color.RST} {wf_name} {Color.DIM}(no runs-on found){Color.RST}") continue # Check if labels match available runners missing = required_labels - runner_labels offline = (required_labels & runner_labels) - online_labels if missing: print(f" {Color.RED}✗{Color.RST} {wf_name}") print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}") print(f" {Color.RED}missing:{Color.RST} {', '.join(missing)}") issues.append((wf_name, "missing labels", missing)) elif offline: print(f" {Color.YLW}○{Color.RST} {wf_name}") print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}") print(f" {Color.YLW}offline:{Color.RST} {', '.join(offline)}") issues.append((wf_name, "runners offline", offline)) else: print(f" {Color.GRN}✓{Color.RST} {wf_name} {Color.DIM}({', '.join(required_labels)}){Color.RST}") # Summary if issues: print(f"\n{Color.BLD}Issues Found:{Color.RST}\n") all_missing = set() for wf_name, issue_type, labels in issues: if issue_type == "missing labels": all_missing.update(labels) if all_missing: print(f" Missing runner labels: {', '.join(sorted(all_missing))}") print(f" {Color.DIM}Register runners with these labels or update workflows{Color.RST}") return 1 print(f"\n{Color.GRN}All workflows have matching runners{Color.RST}") return 0 return 0 def cmd_trigger(api: GiteaAPI, args: argparse.Namespace) -> int: """Trigger a workflow manually.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 workflow = args.workflow ref = args.ref or "master" # Parse inputs if provided inputs = {} if hasattr(args, "input") and args.input: for inp in args.input: if "=" in inp: key, value = inp.split("=", 1) inputs[key] = value # If no workflow specified, list available ones if not workflow: workflows = api.get_workflows(owner, repo) if not workflows: print(f"{Color.DIM}No workflows found{Color.RST}") return 1 print(f"{Color.BLD}Available workflows:{Color.RST}\n") for wf in workflows: name = wf.get("name", "") path = wf.get("path", "") state = wf.get("state", "") print(f" {Color.BLD}{path}{Color.RST}") if name: print(f" Name: {name}") if state: print(f" State: {state}") print() print(f"{Color.DIM}Use: gitea-ci trigger -w {Color.RST}") return 0 print(f"{Color.BLD}Triggering workflow...{Color.RST}") print(f" Workflow: {workflow}") print(f" Branch: {ref}") if inputs: print(f" Inputs: {inputs}") if api.trigger_workflow(owner, repo, workflow, ref, inputs if inputs else None): print(f"\n{Color.GRN}✓ Workflow triggered successfully{Color.RST}") print(f"{Color.DIM}Use 'gitea-ci watch {owner}/{repo}' to monitor{Color.RST}") return 0 else: print(f"\n{Color.RED}✗ Failed to trigger workflow{Color.RST}") return 1 def cmd_rerun(api: GiteaAPI, args: argparse.Namespace) -> int: """Re-run a workflow or job.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 # Re-run specific job if args.job_id: print(f"{Color.BLD}Re-running job #{args.job_id}...{Color.RST}") if api.rerun_job(owner, repo, args.job_id): print(f"{Color.GRN}✓ Job re-run started{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to re-run job{Color.RST}") return 1 # Get run ID run_id = args.run_id if not run_id: # Get latest run runs = api.get_runs(owner, repo, limit=1) if not runs: print(f"{Color.RED}No runs found{Color.RST}") return 1 run_id = runs[0].get("id") # Re-run failed jobs only or all jobs if args.failed_only: print(f"{Color.BLD}Re-running failed jobs in run #{run_id}...{Color.RST}") if api.rerun_failed_jobs(owner, repo, run_id): print(f"{Color.GRN}✓ Failed jobs re-run started{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to re-run jobs{Color.RST}") return 1 else: print(f"{Color.BLD}Re-running all jobs in run #{run_id}...{Color.RST}") if api.rerun_workflow(owner, repo, run_id): print(f"{Color.GRN}✓ Workflow re-run started{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to re-run workflow{Color.RST}") return 1 def cmd_cancel(api: GiteaAPI, args: argparse.Namespace) -> int: """Cancel a running workflow.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 run_id = args.run_id if not run_id: # Get latest running run runs = api.get_runs(owner, repo, limit=10) running = [r for r in runs if r.get("status") in ("running", "waiting", "queued")] if not running: print(f"{Color.DIM}No running workflows to cancel{Color.RST}") return 0 run_id = running[0].get("id") print(f"{Color.BLD}Cancelling run #{run_id}...{Color.RST}") if api.cancel_run(owner, repo, run_id): print(f"{Color.GRN}✓ Workflow cancelled{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to cancel workflow{Color.RST}") return 1 def cmd_artifacts(api: GiteaAPI, args: argparse.Namespace) -> int: """List or download artifacts.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 # Download specific artifact if args.download: artifact_id = args.download output = args.output or f"artifact-{artifact_id}.zip" print(f"{Color.BLD}Downloading artifact #{artifact_id}...{Color.RST}") if api.download_artifact(owner, repo, artifact_id, output): size = Path(output).stat().st_size print(f"{Color.GRN}✓ Downloaded to {output} ({size:,} bytes){Color.RST}") return 0 return 1 # List artifacts run_id = args.run_id artifacts = api.get_artifacts(owner, repo, run_id) if not artifacts: print(f"{Color.DIM}No artifacts found{Color.RST}") return 0 print(f"{Color.BLD}Artifacts:{Color.RST}\n") for art in artifacts: art_id = art.get("id", 0) name = art.get("name", "unknown") size = art.get("size_in_bytes", 0) expired = art.get("expired", False) created = format_time(art.get("created_at", "")) status_icon = f"{Color.DIM}(expired){Color.RST}" if expired else "" print(f" {Color.BLD}#{art_id}{Color.RST} {name} {status_icon}") print(f" {Color.DIM}Size: {size:,} bytes · Created: {created}{Color.RST}") print() print(f"{Color.DIM}Download: gitea-ci artifacts -d [-O filename.zip]{Color.RST}") return 0 def cmd_pr(api: GiteaAPI, args: argparse.Namespace) -> int: """Show CI status for a pull request.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 pr_number = args.pr_number if not pr_number: print(f"{Color.RED}PR number required (-p/--pr){Color.RST}") return 1 data = api.get_pr_checks(owner, repo, pr_number) if not data: print(f"{Color.RED}PR #{pr_number} not found{Color.RST}") return 1 pr = data["pr"] sha = data["sha"][:8] status = data.get("status", {}) # PR info title = pr.get("title", "") state = pr.get("state", "") mergeable = pr.get("mergeable", False) user = pr.get("user", {}).get("login", "") print(f"{Color.BLD}PR #{pr_number}: {title}{Color.RST}") print(f" {Color.DIM}Author:{Color.RST} {user}") print(f" {Color.DIM}State:{Color.RST} {state}") print(f" {Color.DIM}HEAD:{Color.RST} {sha}") print(f" {Color.DIM}Mergeable:{Color.RST} {'Yes' if mergeable else 'No'}") print() # CI status if status: overall = status.get("state", "pending") statuses = status.get("statuses", []) # Map state to display state_display = { "success": f"{Color.GRN}✓ All checks passed{Color.RST}", "pending": f"{Color.YLW}● Checks in progress{Color.RST}", "failure": f"{Color.RED}✗ Some checks failed{Color.RST}", "error": f"{Color.RED}✗ Check errors{Color.RST}", } print(f"{Color.BLD}CI Status:{Color.RST} {state_display.get(overall, overall)}\n") if statuses: for s in statuses: ctx = s.get("context", "") state = s.get("state", "") desc = s.get("description", "")[:50] icon = status_display(state, state) print(f" {icon} {ctx}") if desc: print(f" {Color.DIM}{desc}{Color.RST}") else: # Try to get workflow runs for this commit runs = api.get_runs(owner, repo, limit=5) commit_runs = [r for r in runs if r.get("head_sha", "").startswith(sha)] if commit_runs: print(f" {Color.DIM}Workflow runs for this commit:{Color.RST}") for run in commit_runs: run_status = status_display(run.get("status", ""), run.get("conclusion", "")) run_title = run.get("display_title", "")[:40] print(f" {run_status} {run_title}") else: print(f"{Color.DIM}No CI status found{Color.RST}") return 0 def cmd_compare(api: GiteaAPI, args: argparse.Namespace) -> int: """Compare two workflow runs.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 run1_id = args.run1 run2_id = args.run2 # If only one run specified, compare with previous if not run2_id: runs = api.get_runs(owner, repo, limit=20) found_idx = None for i, r in enumerate(runs): if r.get("id") == run1_id: found_idx = i break if found_idx is not None and found_idx + 1 < len(runs): run2_id = runs[found_idx + 1].get("id") else: print(f"{Color.RED}Cannot find previous run to compare{Color.RST}") return 1 # Fetch both runs and their jobs run1 = api.get_run(owner, repo, run1_id) run2 = api.get_run(owner, repo, run2_id) if not run1 or not run2: print(f"{Color.RED}Could not fetch one or both runs{Color.RST}") return 1 jobs1 = api.get_jobs(owner, repo, run1_id) jobs2 = api.get_jobs(owner, repo, run2_id) # JSON output if getattr(args, "json", False): job_changes: list[dict[str, Any]] = [] output: dict[str, Any] = { "repository": f"{owner}/{repo}", "run1": {"id": run1_id, "status": run1.get("status"), "conclusion": run1.get("conclusion")}, "run2": {"id": run2_id, "status": run2.get("status"), "conclusion": run2.get("conclusion")}, "job_changes": job_changes, } jobs1_map = {j.get("name"): j for j in jobs1} jobs2_map = {j.get("name"): j for j in jobs2} all_job_names = set(jobs1_map.keys()) | set(jobs2_map.keys()) for name in sorted(all_job_names): j1 = jobs1_map.get(name, {}) j2 = jobs2_map.get(name, {}) job_changes.append({ "name": name, "run1_conclusion": j1.get("conclusion"), "run2_conclusion": j2.get("conclusion"), "changed": j1.get("conclusion") != j2.get("conclusion"), }) print(json.dumps(output, indent=2)) return 0 # Pretty output print(f"{Color.BLD}Comparing runs:{Color.RST}") print(f" Run #{run1_id}: {status_display(run1.get('status', ''), run1.get('conclusion', ''))} " f"{run1.get('display_title', '')[:40]}") print(f" Run #{run2_id}: {status_display(run2.get('status', ''), run2.get('conclusion', ''))} " f"{run2.get('display_title', '')[:40]}") print() # Compare jobs jobs1_map = {j.get("name"): j for j in jobs1} jobs2_map = {j.get("name"): j for j in jobs2} all_job_names_sorted = sorted(set(jobs1_map.keys()) | set(jobs2_map.keys())) changes = [] unchanged = [] for name in all_job_names_sorted: j1 = jobs1_map.get(name, {}) j2 = jobs2_map.get(name, {}) c1 = j1.get("conclusion", "missing") c2 = j2.get("conclusion", "missing") if c1 != c2: changes.append((name, c1, c2)) else: unchanged.append((name, c1)) if changes: print(f"{Color.BLD}Changed jobs:{Color.RST}\n") for name, c1, c2 in changes: s1 = status_display("completed", c1) s2 = status_display("completed", c2) print(f" {name}") print(f" {s1} #{run1_id} → {s2} #{run2_id}") print() if unchanged and not changes: print(f"{Color.DIM}All {len(unchanged)} jobs have same status{Color.RST}") elif unchanged: print(f"{Color.DIM}Unchanged: {len(unchanged)} jobs{Color.RST}") return 0 def cmd_infra(api: GiteaAPI, args: argparse.Namespace) -> int: """Show infrastructure status: Gitea version, runners, workflows.""" owner = getattr(args, "owner", None) or api.config.default_owner repo = getattr(args, "repo", None) or api.config.default_repo print(f"{Color.BLD}Infrastructure Status{Color.RST}\n") print(f"{Color.DIM}Instance:{Color.RST} {api.config.url}") # Gitea version version = api.get_version() if version: print(f"{Color.DIM}Version:{Color.RST} Gitea {version.get('version', 'unknown')}") print() # Try to get runners at different levels runners_found = False # Repository runners if owner and repo: print(f"{Color.BLD}Repository: {owner}/{repo}{Color.RST}") runners = api.get_runners(owner, repo) if runners: runners_found = True print(f"\n{Color.BLD}Repository Runners:{Color.RST}") for r in runners: status = "online" if r.get("status") == "online" else "offline" color = Color.GRN if status == "online" else Color.RED name = r.get("name", "unnamed") labels = ", ".join(r.get("labels", [])[:3]) or "no labels" print(f" {color}●{Color.RST} {name} ({labels})") # Workflows workflows = api.get_workflows(owner, repo) if workflows: print(f"\n{Color.BLD}Workflows:{Color.RST}") for wf in workflows: name = wf.get("name", "") path = wf.get("path", "").split("/")[-1] state = wf.get("state", "") if state == "active": print(f" {Color.GRN}✓{Color.RST} {path} - {name}") else: print(f" {Color.DIM}○{Color.RST} {path} - {name} ({state})") # Recent runs summary runs = api.get_runs(owner, repo, limit=10) if runs: success = sum(1 for r in runs if r.get("conclusion") == "success") failed = sum(1 for r in runs if r.get("conclusion") == "failure") running = sum(1 for r in runs if r.get("status") in ("running", "waiting", "queued")) print(f"\n{Color.BLD}Recent Runs (last 10):{Color.RST}") print(f" {Color.GRN}✓{Color.RST} {success} success {Color.RED}✗{Color.RST} {failed} failed {Color.BLU}●{Color.RST} {running} active") # Organization runners (if owner specified) if owner and not repo: print(f"{Color.BLD}Organization: {owner}{Color.RST}") runners = api.get_runners(owner) if runners: runners_found = True print(f"\n{Color.BLD}Organization Runners:{Color.RST}") for r in runners: status = "online" if r.get("status") == "online" else "offline" color = Color.GRN if status == "online" else Color.RED name = r.get("name", "unnamed") print(f" {color}●{Color.RST} {name}") # Global runners (admin) if not owner: runners = api.get_runners() if runners: runners_found = True print(f"{Color.BLD}Global Runners:{Color.RST}") for r in runners: status = r.get("status", "unknown") color = Color.GRN if status == "online" else Color.RED name = r.get("name", "unnamed") labels = ", ".join(r.get("labels", [])[:3]) or "no labels" busy = r.get("busy", False) busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else "" print(f" {color}●{Color.RST} {name} ({labels}){busy_str}") elif not runners_found: print(f"{Color.DIM}No runners found (may require admin access){Color.RST}") # Pending jobs if getattr(args, "jobs", False): jobs = api.get_runner_jobs(owner or "", repo or "") if jobs: print(f"\n{Color.BLD}Pending/Running Jobs:{Color.RST}") for job in jobs[:10]: status = job.get("status", "") name = job.get("name", "") repo_name = job.get("repository", {}).get("full_name", "") print(f" {Color.BLU}●{Color.RST} {name} ({repo_name})") return 0 def cmd_runners(api: GiteaAPI, args: argparse.Namespace) -> int: """Manage runners - list, delete, info, jobs.""" owner = getattr(args, "owner", None) or api.config.default_owner repo = getattr(args, "repo", None) or api.config.default_repo action = getattr(args, "action", "list") runner_id = getattr(args, "runner_id", None) # Determine scope string if owner and repo: scope = f"repository {owner}/{repo}" elif owner: scope = f"organization {owner}" else: scope = "user-level" # Handle different actions if action == "delete": if not runner_id: print(f"{Color.RED}Runner ID required: gitea-ci runners delete {Color.RST}") return 1 # Confirm unless --force if not getattr(args, "force", False): confirm = input(f"Delete runner #{runner_id}? [y/N]: ").strip().lower() if confirm != "y": print(f"{Color.DIM}Cancelled{Color.RST}") return 0 print(f"{Color.BLD}Deleting runner #{runner_id}...{Color.RST}") if api.delete_runner(runner_id, owner or "", repo or ""): print(f"{Color.GRN}✓ Runner deleted{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to delete runner{Color.RST}") print(f"{Color.DIM}(API may not be available in this Gitea version){Color.RST}") return 1 elif action == "info": if not runner_id: print(f"{Color.RED}Runner ID required: gitea-ci runners info {Color.RST}") return 1 runner = api.get_runner(runner_id, owner or "", repo or "") if not runner: print(f"{Color.RED}Runner #{runner_id} not found{Color.RST}") return 1 if getattr(args, "json", False): print(json.dumps(runner, indent=2)) return 0 print(f"{Color.BLD}Runner #{runner_id}{Color.RST}") print("─" * 35) print(f" {Color.BLD}Name:{Color.RST} {runner.get('name', 'unnamed')}") status = runner.get("status", "unknown") status_color = Color.GRN if status == "online" else Color.RED print(f" {Color.BLD}Status:{Color.RST} {status_color}{status}{Color.RST}") print(f" {Color.BLD}Busy:{Color.RST} {'Yes' if runner.get('busy') else 'No'}") labels = runner.get("labels", []) if labels: print(f" {Color.BLD}Labels:{Color.RST} {', '.join(labels)}") if runner.get("version"): print(f" {Color.BLD}Version:{Color.RST} {runner.get('version')}") return 0 elif action == "jobs": jobs = api.get_runner_jobs(owner or "", repo or "") if getattr(args, "json", False): print(json.dumps({"scope": scope, "jobs": jobs}, indent=2)) return 0 print(f"{Color.BLD}Runner Jobs ({scope}):{Color.RST}\n") if not jobs: print(f"{Color.DIM}No running/pending jobs{Color.RST}") return 0 for job in jobs: job_id = job.get("id", 0) name = job.get("name", "unnamed") status = job.get("status", "unknown") runs_on = job.get("runs_on", []) color, symbol = STATUS_STYLE.get(status, (Color.DIM, "?")) labels_str = f" [{', '.join(runs_on)}]" if runs_on else "" print(f" {color}{symbol}{Color.RST} #{job_id} {name}{labels_str}") return 0 else: # list (default) if owner and repo: runners = api.get_runners(owner, repo) elif owner: runners = api.get_runners(owner) else: runners = api.get_runners() if getattr(args, "json", False): print(json.dumps({"scope": scope, "runners": runners}, indent=2)) return 0 print(f"{Color.BLD}Runners ({scope}):{Color.RST}\n") if not runners: print(f"{Color.DIM}No runners found{Color.RST}") print(f"{Color.DIM}(May require authentication or admin access){Color.RST}") return 0 online = 0 offline = 0 for r in runners: rid = r.get("id", 0) name = r.get("name", "unnamed") status = r.get("status", "unknown") busy = r.get("busy", False) labels = r.get("labels", []) if status == "online": online += 1 color = Color.GRN symbol = "●" else: offline += 1 color = Color.RED symbol = "○" busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else "" labels_str = f" ({', '.join(labels[:4])})" if labels else "" print(f" {color}{symbol}{Color.RST} #{rid} {name}{labels_str}{busy_str}") print(f"\n{Color.DIM}Total: {online} online, {offline} offline{Color.RST}") return 0 def cmd_register_token(api: GiteaAPI, args: argparse.Namespace) -> int: """Get runner registration token.""" # If --user flag is set, force user-level token if getattr(args, "user", False): owner = "" repo = "" else: owner = getattr(args, "owner", None) or api.config.default_owner repo = getattr(args, "repo", None) or api.config.default_repo # Determine scope if owner and repo: scope = f"repository {owner}/{repo}" elif owner: scope = f"organization {owner}" else: scope = "user" print(f"{Color.BLD}Getting registration token ({scope})...{Color.RST}") token = api.get_registration_token(owner or "", repo or "") if token: print(f"\n{Color.GRN}Registration Token:{Color.RST}\n") print(f" {token}") print(f"\n{Color.DIM}Usage:{Color.RST}") print(f" act_runner register --instance {api.config.url} --token {token}") print() return 0 else: print(f"{Color.RED}Failed to get registration token{Color.RST}") print(f"{Color.DIM}Make sure your token has the required scopes{Color.RST}") return 1 def cmd_delete(api: GiteaAPI, args: argparse.Namespace) -> int: """Delete a workflow run.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 run_id = args.run_id if not run_id: print(f"{Color.RED}Run ID required (-R/--run-id){Color.RST}") return 1 # Confirm unless --force if not getattr(args, "force", False): confirm = input(f"Delete run #{run_id}? [y/N]: ").strip().lower() if confirm != "y": print(f"{Color.DIM}Cancelled{Color.RST}") return 0 print(f"{Color.BLD}Deleting run #{run_id}...{Color.RST}") if api.delete_run(owner, repo, run_id): print(f"{Color.GRN}✓ Run deleted{Color.RST}") return 0 else: print(f"{Color.RED}✗ Failed to delete run{Color.RST}") return 1 def cmd_workflows(api: GiteaAPI, args: argparse.Namespace) -> int: """List workflows in a repository with details.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.RED}Repository required{Color.RST}") return 1 workflows = api.get_workflows(owner, repo) if getattr(args, "json", False): print(json.dumps({"repository": f"{owner}/{repo}", "workflows": workflows}, indent=2)) return 0 print(f"{Color.BLD}Workflows for {owner}/{repo}:{Color.RST}\n") if not workflows: print(f"{Color.DIM}No workflows found{Color.RST}") return 0 for wf in workflows: name = wf.get("name", "unnamed") path = wf.get("path", "") state = wf.get("state", "unknown") filename = path.split("/")[-1] if path else "" if state == "active": color = Color.GRN symbol = "✓" elif state == "disabled": color = Color.YLW symbol = "○" else: color = Color.DIM symbol = "?" print(f" {color}{symbol}{Color.RST} {filename}") print(f" {Color.DIM}Name:{Color.RST} {name}") print(f" {Color.DIM}Path:{Color.RST} {path}") # Get workflow triggers if verbose if getattr(args, "verbose", False): content = api.get_workflow_contents(owner, repo, filename) if content: # Parse YAML for triggers triggers = [] in_on = False for line in content.split("\n"): stripped = line.strip() if stripped.startswith("on:"): in_on = True # Single-line on: push if ":" in stripped[3:]: triggers.append(stripped[3:].strip()) elif in_on: if stripped and not stripped.startswith("#"): if line[0] not in (" ", "\t"): break # End of on: block if ":" in stripped or stripped.endswith(":"): triggers.append(stripped.rstrip(":")) if triggers: print(f" {Color.DIM}Triggers:{Color.RST} {', '.join(triggers[:5])}") print() return 0 def cmd_validate(api: GiteaAPI, args: argparse.Namespace) -> int: """Validate local workflow files.""" path_arg = getattr(args, "path", None) check_runners = getattr(args, "check_runners", False) strict = getattr(args, "strict", False) # Find workflow files if path_arg: path = Path(path_arg) if path.is_file(): workflows = [path] elif path.is_dir(): workflows = list(path.glob("*.yml")) + list(path.glob("*.yaml")) else: print(f"{Color.RED}Path not found: {path_arg}{Color.RST}") return 1 else: # Look for workflow directories for workflow_dir in [".gitea/workflows", ".github/workflows"]: p = Path(workflow_dir) if p.exists(): workflows = list(p.glob("*.yml")) + list(p.glob("*.yaml")) break else: print(f"{Color.RED}No workflow directory found{Color.RST}") print(f"{Color.DIM}Expected: .gitea/workflows/ or .github/workflows/{Color.RST}") return 1 if not workflows: print(f"{Color.YLW}No workflow files found{Color.RST}") return 0 # Get available runner labels if checking runners available_labels: set = set() if check_runners: owner = getattr(args, "owner", None) or api.config.default_owner repo = getattr(args, "repo", None) or api.config.default_repo runners = api.get_runners(owner or "", repo or "") for r in runners: for label in r.get("labels", []): available_labels.add(label) total_errors = 0 total_warnings = 0 for wf_path in sorted(workflows): content = wf_path.read_text() errors, warnings = _validate_workflow(content, wf_path.name, available_labels) if errors or warnings: print(f"{Color.BLD}{wf_path}{Color.RST}") for level, line, msg in errors: print(f" {Color.RED}error:{Color.RST} {msg}" + (f" (line {line})" if line else "")) total_errors += 1 for level, line, msg in warnings: print(f" {Color.YLW}warning:{Color.RST} {msg}" + (f" (line {line})" if line else "")) total_warnings += 1 print() else: print(f"{Color.GRN}✓{Color.RST} {wf_path}") # Summary print() if total_errors > 0: print(f"{Color.RED}{total_errors} error(s){Color.RST}, {total_warnings} warning(s)") return 1 elif total_warnings > 0 and strict: print(f"{Color.YLW}{total_warnings} warning(s) (strict mode){Color.RST}") return 1 elif total_warnings > 0: print(f"{Color.GRN}OK{Color.RST} with {total_warnings} warning(s)") return 0 else: print(f"{Color.GRN}All {len(workflows)} workflow(s) valid{Color.RST}") return 0 def _validate_workflow(content: str, filename: str, available_labels: set) -> tuple: """Validate workflow content without external YAML parser. Returns (errors, warnings) where each is a list of (level, line, message). """ errors: list[tuple] = [] warnings: list[tuple] = [] lines = content.split("\n") # Check for required top-level keys has_name = False has_on = False has_jobs = False for i, line in enumerate(lines, 1): stripped = line.strip() if stripped.startswith("#"): continue if re.match(r"^name\s*:", line): has_name = True if re.match(r"^on\s*:", line): has_on = True if re.match(r"^jobs\s*:", line): has_jobs = True if not has_name: warnings.append(("warning", 0, "Missing 'name:' field (recommended)")) if not has_on: errors.append(("error", 0, "Missing 'on:' trigger definition")) if not has_jobs: errors.append(("error", 0, "Missing 'jobs:' section")) # Check runs-on labels runs_on_matches = re.finditer(r"runs-on\s*:\s*(.+)", content) for match in runs_on_matches: label = match.group(1).strip().strip("'\"") # Find line number pos = match.start() line_num = content[:pos].count("\n") + 1 # Check if label is available (if we have runner info) if available_labels and label not in available_labels: warnings.append(("warning", line_num, f"runs-on label '{label}' not found in available runners")) # Check for common issues for i, line in enumerate(lines, 1): stripped = line.strip() # Check for tabs (YAML doesn't like tabs) if "\t" in line and not stripped.startswith("#"): errors.append(("error", i, "Tab character found (use spaces for indentation)")) # Check for trailing colons without values (common mistake) if re.match(r"^\s+-\s*$", line): errors.append(("error", i, "Empty list item")) # Check for uses without version uses_match = re.match(r".*uses\s*:\s*([^@\s]+)$", stripped) if uses_match and not stripped.startswith("#"): action = uses_match.group(1) if "/" in action and "@" not in action: warnings.append(("warning", i, f"Action '{action}' should specify version (e.g., @v1)")) return errors, warnings def cmd_stats(api: GiteaAPI, args: argparse.Namespace) -> int: """Show comprehensive workflow statistics with visual graphs.""" owner = args.owner or api.config.default_owner repo = args.repo or api.config.default_repo if not owner or not repo: print(f"{Color.BLD}Error:{Color.RST} Repository required") return 1 # Fetch more runs for better statistics limit = getattr(args, "limit", 50) or 50 runs = api.get_runs(owner, repo, limit=limit) if not runs: print(f"{Color.DIM}No workflow runs found{Color.RST}") return 0 # JSON output if getattr(args, "json", False): return _output_stats_json(runs, owner, repo) print(f"{Color.BLD}Workflow Statistics: {owner}/{repo}{Color.RST}") print("═" * 60) # Calculate statistics total = len(runs) success = [r for r in runs if r.get("conclusion") == "success"] failed = [r for r in runs if r.get("conclusion") == "failure"] cancelled = [r for r in runs if r.get("conclusion") == "cancelled"] running = [r for r in runs if r.get("status") in ("running", "waiting")] completed = total - len(running) # Summary section print(f"{Color.BLD}Summary{Color.RST} {Color.DIM}(last {total} runs){Color.RST}") print("─" * 40) success_rate = (len(success) / completed * 100) if completed > 0 else 0 print(f" {'Total:':<12} {total}") print(f" {'Completed:':<12} {completed}") print(f" {'✓ Success:':<12} {len(success)}") print(f" {'✗ Failed:':<12} {len(failed)}") print(f" {'○ Cancelled:':<12} {len(cancelled)}") print(f" {'● Running:':<12} {len(running)}") print() # Success rate bar print(f"{Color.BLD}Success Rate{Color.RST}") print("─" * 40) _print_rate_bar("Overall", success_rate, 30) print() # Per-workflow breakdown workflows: dict[str, dict] = {} for run in runs: wf = run.get("workflow") or run.get("name") or "unknown" if wf not in workflows: workflows[wf] = {"total": 0, "success": 0, "failed": 0, "durations": []} workflows[wf]["total"] += 1 if run.get("conclusion") == "success": workflows[wf]["success"] += 1 elif run.get("conclusion") == "failure": workflows[wf]["failed"] += 1 # Track duration duration = _parse_duration_seconds(run) if duration > 0: workflows[wf]["durations"].append(duration) if len(workflows) > 1: print(f"{Color.BLD}Per-Workflow Breakdown{Color.RST}") print("─" * 40) for wf_name, stats in sorted(workflows.items()): wf_total = stats["total"] wf_success = stats["success"] wf_rate = (wf_success / wf_total * 100) if wf_total > 0 else 0 # Truncate workflow name display_name = wf_name[:20] if len(wf_name) > 20 else wf_name _print_rate_bar(display_name, wf_rate, 20, show_count=f"{wf_success}/{wf_total}") print() # Duration statistics all_durations = [] for run in runs: duration = _parse_duration_seconds(run) if duration > 0: all_durations.append(duration) if all_durations: print(f"{Color.BLD}Duration Statistics{Color.RST}") print("─" * 40) avg_duration = sum(all_durations) / len(all_durations) min_duration = min(all_durations) max_duration = max(all_durations) print(f" {'Average:':<12} {_format_seconds(int(avg_duration))}") print(f" {'Fastest:':<12} {_format_seconds(min_duration)}") print(f" {'Slowest:':<12} {_format_seconds(max_duration)}") print() # Recent trend (last 10 runs) recent = runs[:min(10, len(runs))] if recent: print(f"{Color.BLD}Recent Trend{Color.RST} {Color.DIM}(last {len(recent)} runs){Color.RST}") print("─" * 40) trend_line = " " for run in reversed(recent): # Oldest to newest conclusion = run.get("conclusion", "") if conclusion == "success": trend_line += "█" # Full block for success elif conclusion == "failure": trend_line += "▁" # Low block for failure (bar chart effect) elif run.get("status") in ("running", "waiting"): trend_line += "▒" # Medium shade for running else: trend_line += "░" # Light shade for other print(trend_line) print(f" {Color.DIM}█=pass ▁=fail ▒=running ░=other{Color.RST}") print(f" {Color.DIM}← older newer →{Color.RST}") print() # Job-level statistics (if detailed flag) if getattr(args, "detailed", False): _print_job_stats(api, runs[:20], owner, repo) return 0 def _print_rate_bar(label: str, rate: float, width: int = 20, show_count: str = "") -> None: """Print a labeled progress bar for rates. No colors per STYLING.md.""" filled = int(rate / 100 * width) empty = width - filled # Use block characters only - no colors bar = f"{'█' * filled}{Color.DIM}{'░' * empty}{Color.RST}" count_str = f" {Color.DIM}{show_count}{Color.RST}" if show_count else "" print(f" {label:<20} {bar} {rate:5.1f}%{count_str}") def _parse_duration_seconds(run: dict) -> int: """Parse run duration to seconds.""" # Try direct duration field duration_str = run.get("duration", "") if duration_str: # Parse formats like "1m30s", "45s", "2m" total = 0 import re as re_mod minutes = re_mod.search(r"(\d+)m", duration_str) seconds = re_mod.search(r"(\d+)s", duration_str) if minutes: total += int(minutes.group(1)) * 60 if seconds: total += int(seconds.group(1)) return total # Calculate from timestamps started = run.get("run_started_at", "") ended = run.get("updated_at", "") if started and ended and run.get("status") == "completed": try: from datetime import datetime start_dt = datetime.fromisoformat(started.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00")) return int((end_dt - start_dt).total_seconds()) except (ValueError, TypeError): pass return 0 def _format_seconds(seconds: int) -> str: """Format seconds to human-readable duration.""" if seconds < 60: return f"{seconds}s" minutes = seconds // 60 secs = seconds % 60 if minutes < 60: return f"{minutes}m{secs}s" if secs else f"{minutes}m" hours = minutes // 60 mins = minutes % 60 return f"{hours}h{mins}m" def _print_job_stats(api: GiteaAPI, runs: list, owner: str, repo: str) -> None: """Print job-level statistics. No colors per STYLING.md.""" print(f"{Color.BLD}Job Statistics{Color.RST} {Color.DIM}(from {len(runs)} recent runs){Color.RST}") print("─" * 40) job_stats: dict[str, dict] = {} for run in runs: if run.get("status") != "completed": continue run_id = run.get("id") jobs = api.get_jobs(owner, repo, run_id) for job in jobs: name = job.get("name", "unknown") if name not in job_stats: job_stats[name] = {"success": 0, "failed": 0, "durations": []} if job.get("conclusion") == "success": job_stats[name]["success"] += 1 elif job.get("conclusion") == "failure": job_stats[name]["failed"] += 1 # Parse job duration started = job.get("started_at", "") ended = job.get("completed_at", "") if started and ended: try: from datetime import datetime start_dt = datetime.fromisoformat(started.replace("Z", "+00:00")) end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00")) duration = int((end_dt - start_dt).total_seconds()) # Sanity check: duration should be reasonable (< 24 hours) if 0 < duration < 86400: job_stats[name]["durations"].append(duration) except (ValueError, TypeError): pass if not job_stats: print(f" {Color.DIM}No job data available{Color.RST}") return # Print job table - use symbols, no colors print(f" {'Job':<30} {'✓ Pass':<8} {'✗ Fail':<8} {'Avg':<8}") print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*8}") for job_name, stats in sorted(job_stats.items()): total = stats["success"] + stats["failed"] if total == 0: continue name_display = job_name[:30] if len(job_name) > 30 else job_name avg_dur = "" if stats["durations"]: avg = sum(stats["durations"]) / len(stats["durations"]) avg_dur = _format_seconds(int(avg)) # No colors - use plain text fail_str = str(stats["failed"]) if stats["failed"] > 0 else f"{Color.DIM}0{Color.RST}" print(f" {name_display:<30} {stats['success']:<8} {fail_str:<8} {avg_dur:<8}") print() def _output_stats_json(runs: list, owner: str, repo: str) -> int: """Output statistics as JSON.""" total = len(runs) success = len([r for r in runs if r.get("conclusion") == "success"]) failed = len([r for r in runs if r.get("conclusion") == "failure"]) cancelled = len([r for r in runs if r.get("conclusion") == "cancelled"]) running = len([r for r in runs if r.get("status") in ("running", "waiting")]) completed = total - running # Per-workflow stats workflows: dict[str, dict] = {} for run in runs: wf = run.get("workflow") or "unknown" if wf not in workflows: workflows[wf] = {"total": 0, "success": 0, "failed": 0} workflows[wf]["total"] += 1 if run.get("conclusion") == "success": workflows[wf]["success"] += 1 elif run.get("conclusion") == "failure": workflows[wf]["failed"] += 1 output = { "repository": f"{owner}/{repo}", "total_runs": total, "completed": completed, "success": success, "failed": failed, "cancelled": cancelled, "running": running, "success_rate": round(success / completed * 100, 1) if completed > 0 else 0, "workflows": workflows, } print(json.dumps(output, indent=2)) return 0 def send_notification(title: str, message: str, urgency: str = "normal") -> None: """Send desktop notification via notify-send.""" if shutil.which("notify-send"): try: subprocess.run( ["notify-send", f"--urgency={urgency}", title, message], capture_output=True, timeout=5 ) except (subprocess.SubprocessError, OSError): pass def main() -> int: """Main entry point.""" # Quick check for direct run URL or instance URL as first arg if len(sys.argv) > 1 and not sys.argv[1].startswith("-"): arg = sys.argv[1] # Check if it looks like a URL or hostname (not a command) commands = ("list", "ls", "l", "status", "s", "logs", "log", "watch", "w", "config", "trigger", "t", "rerun", "cancel", "artifacts", "art", "pr") if "." in arg and arg not in commands: # Check for direct run URL: git.mymx.me/user/repo/actions/runs/123 run_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)/actions/runs/(\d+)", arg) if run_match: host, owner, repo, run_id = run_match.groups() url = f"https://{host}" os.environ["GITEA_URL"] = url # Rewrite args to: status -o owner -r repo -R run_id sys.argv = [sys.argv[0], "status", "-o", owner, "-r", repo, "-R", run_id] else: # Check for repo URL: git.mymx.me/user/repo/actions repo_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)(?:/actions)?$", arg) if repo_match: host, owner, repo = repo_match.groups() url = f"https://{host}" os.environ["GITEA_URL"] = url sys.argv = [sys.argv[0], "list", f"{owner}/{repo}"] else: # Just an instance URL url = arg if arg.startswith("http") else f"https://{arg}" os.environ["GITEA_URL"] = url sys.argv.pop(1) # Remove URL from args parser = argparse.ArgumentParser( description="Monitor and manage Gitea Actions workflows", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Commands: list List recent workflow runs (default: discover all) stats Show workflow statistics with visual graphs status Show detailed run status with jobs logs Show job logs watch Watch running workflows in real-time trigger Trigger a workflow manually (workflow_dispatch) rerun Re-run a workflow or failed jobs cancel Cancel a running workflow delete Delete a workflow run artifacts List or download build artifacts pr Show CI status for a pull request compare Compare two workflow runs (diff jobs) runners List runners and their status register-token Get runner registration token workflows List workflows in a repository infra Show infrastructure status config Configure token and defaults Environment: GITEA_URL Gitea instance URL GITEA_TOKEN API token for authentication (required for write operations) Examples: gitea-ci git.mymx.me Discover all workflows on instance gitea-ci git.mymx.me/user/repo List runs for specific repo gitea-ci list user/repo --failed Show only failed runs gitea-ci list user/repo --json Output as JSON gitea-ci stats user/repo Show statistics with graphs gitea-ci stats user/repo --detailed Include per-job breakdown gitea-ci status user/repo Show latest run status gitea-ci status user/repo 123 Show specific run details gitea-ci status user/repo 123 --json Output run details as JSON gitea-ci watch user/repo Watch latest run gitea-ci watch user/repo 123 -t 300 Watch specific run with timeout gitea-ci logs user/repo 123 -j 456 Show job logs gitea-ci trigger user/repo -w lint.yml Trigger workflow manually gitea-ci rerun user/repo --failed Re-run only failed jobs gitea-ci cancel user/repo Cancel latest running workflow gitea-ci artifacts user/repo -d 42 Download artifact #42 gitea-ci pr user/repo -p 123 Show CI status for PR #123 gitea-ci compare user/repo 175 174 Compare two runs gitea-ci delete user/repo -R 123 Delete workflow run #123 gitea-ci runners List runners (user-level) gitea-ci runners -o org List organization runners gitea-ci runners info 5 Show runner #5 details gitea-ci runners delete 5 -f Delete runner #5 (no confirmation) gitea-ci runners jobs Show running/pending jobs gitea-ci register-token Get user-level registration token gitea-ci register-token -o org -r repo Get repo-level registration token gitea-ci workflows user/repo -v List workflows with triggers gitea-ci config --test Test API connectivity gitea-ci config token Show token info gitea-ci config token check Check token permissions gitea-ci config token set Set API token gitea-ci validate Validate local workflow files gitea-ci validate --check-runners Cross-reference runs-on labels gitea-ci validate --strict Treat warnings as errors """, ) parser.add_argument("-u", "--url", help="Gitea URL") subparsers = parser.add_subparsers(dest="command", metavar="command") # Common args for repo-based commands def add_repo_args(p): p.add_argument("-o", "--owner", help="Repository owner") p.add_argument("-r", "--repo", help="Repository name") p.add_argument("repo_path", nargs="?", help="Repository as owner/repo") # list list_p = subparsers.add_parser("list", aliases=["ls", "l"], help="List workflow runs") add_repo_args(list_p) list_p.add_argument("-n", "--limit", type=int, default=10, help="Number of runs") list_p.add_argument("--failed", dest="status_filter", action="store_const", const="failed", help="Show only failed runs") list_p.add_argument("--success", dest="status_filter", action="store_const", const="success", help="Show only successful runs") list_p.add_argument("--running", dest="status_filter", action="store_const", const="running", help="Show only running/waiting") list_p.add_argument("-b", "--branch", help="Filter by branch name") list_p.add_argument("-w", "--workflow", help="Filter by workflow file") list_p.add_argument("--json", action="store_true", help="Output as JSON") list_p.add_argument("--compact", action="store_true", help="Compact single-line output") # stats stats_p = subparsers.add_parser("stats", help="Show workflow statistics with graphs") add_repo_args(stats_p) stats_p.add_argument("-n", "--limit", type=int, default=50, help="Number of runs to analyze") stats_p.add_argument("--detailed", action="store_true", help="Include per-job statistics") stats_p.add_argument("--json", action="store_true", help="Output as JSON") # status (supports repo_path and/or run_id) status_p = subparsers.add_parser("status", aliases=["s"], help="Show run status") status_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") status_p.add_argument("-o", "--owner", help="Repository owner") status_p.add_argument("-r", "--repo", help="Repository name") status_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID") status_p.add_argument("--json", action="store_true", help="Output as JSON") # logs (supports repo_path and/or run_id) logs_p = subparsers.add_parser("logs", aliases=["log"], help="Show job logs") logs_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") logs_p.add_argument("-o", "--owner", help="Repository owner") logs_p.add_argument("-r", "--repo", help="Repository name") logs_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID") logs_p.add_argument("-j", "--job", dest="job_id", type=int, help="Job ID") # watch (supports repo_path and/or run_id) watch_p = subparsers.add_parser("watch", aliases=["w"], help="Watch runs") watch_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]") watch_p.add_argument("-o", "--owner", help="Repository owner") watch_p.add_argument("-r", "--repo", help="Repository name") watch_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID to watch") watch_p.add_argument("-i", "--interval", type=int, default=5, help="Refresh interval (seconds)") watch_p.add_argument("-t", "--timeout", type=int, default=0, help="Auto-exit after N seconds (0=disabled)") watch_p.add_argument("--no-clear", action="store_true", help="Don't clear screen") watch_p.add_argument("--notify", action="store_true", help="Desktop notification on completion") # config config_p = subparsers.add_parser("config", help="Configure gitea-ci") config_p.add_argument("action", nargs="?", choices=["token"], help="Subcommand") config_p.add_argument("token_action", nargs="?", choices=["check", "set"], help="Token action") config_p.add_argument("token_value", nargs="?", help="Token value (for 'set')") config_p.add_argument("--show", action="store_true", help="Show current config") config_p.add_argument("--test", action="store_true", help="Test API connectivity") # repo repo_p = subparsers.add_parser("repo", help="Manage repositories") repo_p.add_argument("action", nargs="?", choices=["list", "create", "exists", "check"], default="list", help="Action: list, create, exists, check") repo_p.add_argument("name", nargs="?", help="Repository name (for create/exists)") repo_p.add_argument("-d", "--description", help="Repository description (for create)") repo_p.add_argument("-p", "--private", action="store_true", help="Make repository private") add_repo_args(repo_p) # trigger trigger_p = subparsers.add_parser("trigger", aliases=["t"], help="Trigger workflow") add_repo_args(trigger_p) trigger_p.add_argument("-w", "--workflow", help="Workflow file (e.g., lint.yml)") trigger_p.add_argument("-b", "--ref", help="Branch/tag/SHA to run on (default: master)") trigger_p.add_argument("-i", "--input", action="append", help="Input as key=value (repeatable)") # rerun rerun_p = subparsers.add_parser("rerun", help="Re-run workflow/jobs") add_repo_args(rerun_p) rerun_p.add_argument("-R", "--run-id", type=int, help="Run ID to re-run") rerun_p.add_argument("-j", "--job-id", type=int, help="Specific job ID to re-run") rerun_p.add_argument("-f", "--failed-only", action="store_true", help="Re-run only failed jobs") # cancel cancel_p = subparsers.add_parser("cancel", help="Cancel running workflow") add_repo_args(cancel_p) cancel_p.add_argument("-R", "--run-id", type=int, help="Run ID to cancel") # delete delete_p = subparsers.add_parser("delete", aliases=["rm"], help="Delete a workflow run") add_repo_args(delete_p) delete_p.add_argument("-R", "--run-id", type=int, required=True, help="Run ID to delete") delete_p.add_argument("-f", "--force", action="store_true", help="Skip confirmation") # artifacts artifacts_p = subparsers.add_parser("artifacts", aliases=["art"], help="List/download artifacts") add_repo_args(artifacts_p) artifacts_p.add_argument("-R", "--run-id", type=int, help="Filter by run ID") artifacts_p.add_argument("-d", "--download", type=int, metavar="ID", help="Download artifact by ID") artifacts_p.add_argument("-O", "--output", help="Output filename for download") # pr pr_p = subparsers.add_parser("pr", help="Show PR CI status") add_repo_args(pr_p) pr_p.add_argument("-p", "--pr-number", type=int, help="Pull request number") # compare compare_p = subparsers.add_parser("compare", aliases=["diff"], help="Compare two runs") add_repo_args(compare_p) compare_p.add_argument("run1", type=int, help="First run ID") compare_p.add_argument("run2", type=int, nargs="?", help="Second run ID (default: previous run)") compare_p.add_argument("--json", action="store_true", help="Output as JSON") # infra infra_p = subparsers.add_parser("infra", help="Show infrastructure status") add_repo_args(infra_p) infra_p.add_argument("--jobs", action="store_true", help="Show pending/running jobs") # runners runners_p = subparsers.add_parser("runners", help="Manage runners") runners_p.add_argument("action", nargs="?", default="list", choices=["list", "delete", "info", "jobs"], help="Action: list, delete, info, jobs") runners_p.add_argument("runner_id", nargs="?", type=int, help="Runner ID (for delete/info)") add_repo_args(runners_p) runners_p.add_argument("--force", "-f", action="store_true", help="Skip confirmation for delete") runners_p.add_argument("--json", action="store_true", help="Output as JSON") # register-token regtoken_p = subparsers.add_parser("register-token", aliases=["regtoken"], help="Get runner registration token") add_repo_args(regtoken_p) regtoken_p.add_argument("--user", action="store_true", help="Get user-level token (ignores repo defaults)") # workflows workflows_p = subparsers.add_parser("workflows", aliases=["wf"], help="List workflows") add_repo_args(workflows_p) workflows_p.add_argument("-v", "--verbose", action="store_true", help="Show workflow triggers and details") # validate validate_p = subparsers.add_parser("validate", aliases=["lint"], help="Validate local workflow files") validate_p.add_argument("path", nargs="?", help="Path to workflow dir or file") add_repo_args(validate_p) validate_p.add_argument("--check-runners", action="store_true", help="Cross-reference runs-on with available runners") validate_p.add_argument("--strict", action="store_true", help="Treat warnings as errors") args = parser.parse_args() # Load config config = Config.load() if args.url: config.url = args.url api = GiteaAPI(config) # Set defaults for optional args if not hasattr(args, "owner"): args.owner = None if not hasattr(args, "repo"): args.repo = None if not hasattr(args, "limit"): args.limit = 10 # Parse repo_path (owner/repo format) if hasattr(args, "repo_path") and args.repo_path and "/" in args.repo_path: parts = args.repo_path.split("/", 1) if not args.owner: args.owner = parts[0] if not args.repo: args.repo = parts[1] # Parse positional args for status/logs/watch: [owner/repo] [run_id] if hasattr(args, "positional") and args.positional: for pos in args.positional: if "/" in pos: # Looks like owner/repo parts = pos.split("/", 1) if not args.owner: args.owner = parts[0] if not args.repo: args.repo = parts[1] elif pos.isdigit(): # Looks like run_id if not args.run_id: args.run_id = int(pos) # Dispatch commands if args.command in (None, "list", "ls", "l"): return cmd_list(api, args) elif args.command == "stats": return cmd_stats(api, args) elif args.command in ("status", "s"): return cmd_status(api, args) elif args.command in ("logs", "log"): return cmd_logs(api, args) elif args.command in ("watch", "w"): return cmd_watch(api, args) elif args.command == "config": return cmd_config(api, args) elif args.command == "repo": return cmd_repo(api, args) elif args.command in ("trigger", "t"): return cmd_trigger(api, args) elif args.command == "rerun": return cmd_rerun(api, args) elif args.command == "cancel": return cmd_cancel(api, args) elif args.command in ("delete", "rm"): return cmd_delete(api, args) elif args.command in ("artifacts", "art"): return cmd_artifacts(api, args) elif args.command == "pr": return cmd_pr(api, args) elif args.command in ("compare", "diff"): return cmd_compare(api, args) elif args.command == "infra": return cmd_infra(api, args) elif args.command == "runners": return cmd_runners(api, args) elif args.command in ("register-token", "regtoken"): return cmd_register_token(api, args) elif args.command in ("workflows", "wf"): return cmd_workflows(api, args) elif args.command in ("validate", "lint"): return cmd_validate(api, args) return 0 if __name__ == "__main__": sys.exit(main())