refactor: restructure into src/gitea_ci package

Split monolithic gitea-ci.py (3068 lines) into modular package:
- src/gitea_ci/config.py: configuration, constants
- src/gitea_ci/api.py: GiteaAPI class
- src/gitea_ci/formatters.py: output formatting
- src/gitea_ci/utils.py: utilities, validation
- src/gitea_ci/commands/: command implementations
- src/gitea_ci/cli.py: argument parser, dispatch

gitea-ci.py now thin wrapper (16 lines)
This commit is contained in:
Username
2026-01-18 18:59:22 +01:00
parent c8b85e8977
commit 8d05e90160
17 changed files with 3345 additions and 3062 deletions

View File

@@ -28,10 +28,15 @@ jobs:
run: pip install -q ruff
- name: Syntax check
run: python -m py_compile gitea-ci.py style.py
run: |
python -m py_compile gitea-ci.py style.py
python -m py_compile src/gitea_ci/*.py src/gitea_ci/commands/*.py
- name: Lint
run: ruff check gitea-ci.py style.py
run: ruff check gitea-ci.py style.py src/gitea_ci/
- name: Test --help
run: python gitea-ci.py --help
- name: Test import
run: python -c "from src.gitea_ci import Config, GiteaAPI, __version__; print(f'v{__version__}')"

View File

@@ -151,3 +151,25 @@ NO_COLOR Disable colors when set
"default_repo": "myproject"
}
```
## Package Structure
```
gitea-ci.py Wrapper script (entry point)
style.py Styling utilities
src/gitea_ci/
├── cli.py Argument parser, dispatch
├── config.py Configuration
├── api.py Gitea API client
├── formatters.py Output formatting
├── utils.py Utilities
└── commands/ Command implementations
```
## Validation
```sh
python3 -m py_compile src/gitea_ci/*.py # Syntax check
./gitea-ci.py --help # Test CLI
python3 -m gitea_ci --help # Module mode (PYTHONPATH=src)
```

View File

@@ -20,12 +20,16 @@ Command-line tool for monitoring and managing Gitea Actions workflows.
## Installation
```sh
# Clone or copy gitea-ci.py and style.py to your PATH
cp gitea-ci.py style.py ~/.local/bin/
chmod +x ~/.local/bin/gitea-ci.py
# Clone the repository
git clone https://github.com/user/gitea-ci.git
cd gitea-ci
# Or symlink
# Option 1: Symlink the wrapper script
ln -s "$(pwd)/gitea-ci.py" ~/.local/bin/gitea-ci
# Option 2: Copy files (wrapper + src/ + style.py)
cp -r gitea-ci.py style.py src/ ~/.local/bin/gitea-ci/
ln -s ~/.local/bin/gitea-ci/gitea-ci.py ~/.local/bin/gitea-ci
```
### Dependencies
@@ -33,6 +37,35 @@ ln -s "$(pwd)/gitea-ci.py" ~/.local/bin/gitea-ci
- Python 3.10+
- No external packages required (uses stdlib only)
### Package Structure
```
gitea-ci/
├── gitea-ci.py Thin wrapper (entry point)
├── style.py Shared styling utilities
└── src/gitea_ci/
├── __init__.py Package metadata, public API
├── __main__.py python -m gitea_ci support
├── cli.py Argument parser, command dispatch
├── config.py Configuration, constants
├── api.py Gitea API client
├── formatters.py Output formatting
├── utils.py Utilities, validation
└── commands/
├── runs.py list, status, logs, watch, delete
├── workflows.py trigger, rerun, cancel, validate
├── artifacts.py artifacts
├── runners.py runners, register-token
└── inspect.py stats, pr, compare, infra, config, repo
```
### Running
```sh
./gitea-ci.py list # Direct execution
python3 -m gitea_ci list # Module execution (with PYTHONPATH=src)
```
## Configuration
### Environment Variables
@@ -380,6 +413,45 @@ gitea-ci list -n 50 --json | jq -r '
gitea-ci validate --strict && git commit -m "feat: add feature"
```
## Development
### Module Overview
| Module | Lines | Purpose |
|--------|-------|---------|
| `config.py` | 67 | Config dataclass, constants |
| `api.py` | 446 | GiteaAPI class, HTTP handling |
| `formatters.py` | 332 | Output formatting, status display |
| `utils.py` | 143 | Filtering, validation, notifications |
| `cli.py` | 349 | Argument parser, dispatch |
| `commands/runs.py` | 461 | Run listing, status, logs, watch |
| `commands/workflows.py` | 296 | Trigger, rerun, cancel, validate |
| `commands/artifacts.py` | 62 | Artifact operations |
| `commands/runners.py` | 188 | Runner management |
| `commands/inspect.py` | 842 | Stats, PR, compare, infra, config, repo |
### Import Structure
```python
from gitea_ci import Config, GiteaAPI, __version__
from gitea_ci.api import GiteaAPI
from gitea_ci.commands import cmd_list, cmd_status
```
### Validation
```sh
# Syntax check all modules
python3 -m py_compile src/gitea_ci/*.py src/gitea_ci/commands/*.py
# Test import
python3 -c "from src.gitea_ci import Config, GiteaAPI; print('OK')"
# Test CLI
./gitea-ci.py --help
./gitea-ci.py config --show
```
## License
MIT

File diff suppressed because it is too large Load Diff

8
src/gitea_ci/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
"""Gitea CI - Monitor and manage Gitea Actions workflows."""
__version__ = "1.0.0"
from .config import Config
from .api import GiteaAPI
__all__ = ["Config", "GiteaAPI", "__version__"]

7
src/gitea_ci/__main__.py Normal file
View File

@@ -0,0 +1,7 @@
"""Entry point for python -m gitea_ci."""
import sys
from .cli import main
if __name__ == "__main__":
sys.exit(main())

446
src/gitea_ci/api.py Normal file
View File

@@ -0,0 +1,446 @@
"""Gitea API client with rate limiting support."""
import json
import sys
import time
import urllib.request
import urllib.error
from typing import Any
from .config import Config
# Import style module - handle both package and standalone use
try:
from style import Color
except ImportError:
import os
import sys as _sys
_sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
from style import Color
class GiteaAPI:
"""Gitea API client with rate limiting support."""
def __init__(self, config: Config):
self.config = config
self.base_url = config.url.rstrip("/") + "/api/v1"
self._retry_delay = 1 # Initial retry delay in seconds
self._max_retry_delay = 60 # Max delay
self._rate_limited = False
def _request(self, endpoint: str, method: str = "GET", retries: int = 3) -> dict | None:
"""Make API request with rate limit handling."""
url = f"{self.base_url}{endpoint}"
headers = {"Accept": "application/json"}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers, method=method)
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
# Reset retry delay on success
self._retry_delay = 1
self._rate_limited = False
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 401:
print(f"{Color.RED}Authentication required. Set GITEA_TOKEN or run: gitea-ci config{Color.RST}")
return None
elif e.code == 404:
return None
elif e.code == 429 or e.code == 403:
# Rate limited - exponential backoff
self._rate_limited = True
retry_after = e.headers.get("Retry-After")
if retry_after and retry_after.isdigit():
delay = int(retry_after)
else:
delay = self._retry_delay
if attempt < retries - 1:
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
# Exponential backoff with max
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}Rate limited. Try again later.{Color.RST}")
return None
elif e.code >= 500:
# Server error - retry with backoff
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Server error {e.code}. Retrying in {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
except urllib.error.URLError as e:
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Connection error. Retrying in {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
else:
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return None
return None
def _request_text(self, endpoint: str, retries: int = 3) -> str | None:
"""Make API request returning text with rate limit handling."""
url = f"{self.base_url}{endpoint}"
headers = {}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers)
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
self._retry_delay = 1
return resp.read().decode()
except urllib.error.HTTPError as e:
if e.code == 429 or e.code == 403:
retry_after = e.headers.get("Retry-After")
delay = int(retry_after) if retry_after and retry_after.isdigit() else self._retry_delay
if attempt < retries - 1:
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}", file=sys.stderr)
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
except urllib.error.URLError:
if attempt < retries - 1:
time.sleep(self._retry_delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
return None
def _post(self, endpoint: str, data: dict, retries: int = 3) -> dict | None:
"""Make POST request with JSON body."""
url = f"{self.base_url}{endpoint}"
headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
else:
print(f"{Color.RED}Authentication required for this operation{Color.RST}")
return None
body = json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, headers=headers, method="POST")
for attempt in range(retries):
try:
with urllib.request.urlopen(req, timeout=30) as resp:
self._retry_delay = 1
# Handle 204 No Content
if resp.status == 204:
return {}
content = resp.read().decode()
return json.loads(content) if content else {}
except urllib.error.HTTPError as e:
if e.code == 401:
print(f"{Color.RED}Authentication failed. Check your token.{Color.RST}")
return None
elif e.code == 403:
print(f"{Color.RED}Permission denied. Token may lack required scopes.{Color.RST}")
return None
elif e.code == 404:
print(f"{Color.RED}Not found: {endpoint}{Color.RST}")
return None
elif e.code == 422:
print(f"{Color.RED}Invalid request (check workflow name/inputs){Color.RST}")
return None
elif e.code == 429:
if attempt < retries - 1:
delay = self._retry_delay
print(f"{Color.YLW}Rate limited. Waiting {delay}s...{Color.RST}")
time.sleep(delay)
self._retry_delay = min(self._retry_delay * 2, self._max_retry_delay)
continue
return None
else:
print(f"{Color.RED}API error: {e.code} {e.reason}{Color.RST}")
return None
except urllib.error.URLError as e:
if attempt < retries - 1:
time.sleep(self._retry_delay)
continue
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return None
return None
def _delete(self, endpoint: str) -> bool:
"""Make DELETE request."""
url = f"{self.base_url}{endpoint}"
headers = {"Accept": "application/json"}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
else:
print(f"{Color.RED}Authentication required for this operation{Color.RST}")
return False
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 204)
except urllib.error.HTTPError as e:
if e.code == 404:
print(f"{Color.RED}Not found: {endpoint}{Color.RST}")
elif e.code == 403:
print(f"{Color.RED}Permission denied{Color.RST}")
else:
print(f"{Color.RED}Delete failed: {e.code}{Color.RST}")
return False
except urllib.error.URLError as e:
print(f"{Color.RED}Connection error: {e.reason}{Color.RST}")
return False
def throttle(self) -> None:
"""Add small delay between requests to avoid rate limiting."""
if self._rate_limited:
time.sleep(0.5)
else:
time.sleep(0.1) # Small delay to be nice to the server
# Repository methods
def get_repos(self, limit: int = 20) -> list[Any]:
"""Get repositories for current user."""
data = self._request(f"/user/repos?limit={limit}")
return list(data) if data else []
def create_repo(self, name: str, description: str = "", private: bool = False,
auto_init: bool = True) -> dict | None:
"""Create a new repository."""
data = {
"name": name,
"description": description,
"private": private,
"auto_init": auto_init,
}
return self._post("/user/repos", data)
def get_repo(self, owner: str, repo: str) -> dict | None:
"""Get repository details."""
return self._request(f"/repos/{owner}/{repo}")
def repo_exists(self, owner: str, repo: str) -> bool:
"""Check if a repository exists."""
return self.get_repo(owner, repo) is not None
# Workflow run methods
def get_runs(self, owner: str, repo: str, limit: int = 10) -> list:
"""Get workflow runs for a repository."""
data = self._request(f"/repos/{owner}/{repo}/actions/runs?limit={limit}")
if data and "workflow_runs" in data:
return data["workflow_runs"]
return data if isinstance(data, list) else []
def get_run(self, owner: str, repo: str, run_id: int) -> dict | None:
"""Get specific workflow run."""
return self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}")
def get_jobs(self, owner: str, repo: str, run_id: int) -> list:
"""Get jobs for a workflow run."""
data = self._request(f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs")
if data and "jobs" in data:
return data["jobs"]
return data if isinstance(data, list) else []
def get_job_logs(self, owner: str, repo: str, job_id: int) -> str | None:
"""Get logs for a job."""
return self._request_text(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs")
def trigger_workflow(self, owner: str, repo: str, workflow: str, ref: str = "master",
inputs: dict | None = None) -> bool:
"""Trigger a workflow_dispatch event."""
endpoint = f"/repos/{owner}/{repo}/actions/workflows/{workflow}/dispatches"
data: dict[str, Any] = {"ref": ref}
if inputs:
data["inputs"] = inputs
return self._post(endpoint, data) is not None
def rerun_job(self, owner: str, repo: str, job_id: int) -> bool:
"""Re-run a specific job."""
return self._post(f"/repos/{owner}/{repo}/actions/jobs/{job_id}/rerun", {}) is not None
def rerun_workflow(self, owner: str, repo: str, run_id: int) -> bool:
"""Re-run all jobs in a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun", {}) is not None
def rerun_failed_jobs(self, owner: str, repo: str, run_id: int) -> bool:
"""Re-run only failed jobs in a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/rerun-failed-jobs", {}) is not None
def cancel_run(self, owner: str, repo: str, run_id: int) -> bool:
"""Cancel a workflow run."""
return self._post(f"/repos/{owner}/{repo}/actions/runs/{run_id}/cancel", {}) is not None
def delete_run(self, owner: str, repo: str, run_id: int) -> bool:
"""Delete a workflow run (for queued runs that can't be cancelled)."""
endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}"
return self._delete(endpoint)
# Artifact methods
def get_artifacts(self, owner: str, repo: str, run_id: int | None = None) -> list:
"""Get artifacts for a repository or specific run."""
if run_id:
endpoint = f"/repos/{owner}/{repo}/actions/runs/{run_id}/artifacts"
else:
endpoint = f"/repos/{owner}/{repo}/actions/artifacts"
data = self._request(endpoint)
if data and "artifacts" in data:
return data["artifacts"]
return data if isinstance(data, list) else []
def download_artifact(self, owner: str, repo: str, artifact_id: int, output_path: str) -> bool:
"""Download an artifact to a file."""
endpoint = f"/repos/{owner}/{repo}/actions/artifacts/{artifact_id}/zip"
url = f"{self.base_url}{endpoint}"
headers = {}
if self.config.token:
headers["Authorization"] = f"token {self.config.token}"
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=120) as resp:
with open(output_path, "wb") as f:
f.write(resp.read())
return True
except (urllib.error.URLError, IOError) as e:
print(f"{Color.RED}Download failed: {e}{Color.RST}")
return False
# Workflow methods
def get_workflows(self, owner: str, repo: str) -> list:
"""Get list of workflows in a repository."""
data = self._request(f"/repos/{owner}/{repo}/actions/workflows")
if data and "workflows" in data:
return data["workflows"]
return data if isinstance(data, list) else []
def get_workflow_contents(self, owner: str, repo: str, workflow: str) -> str | None:
"""Get workflow file contents."""
import base64
# Try .gitea/workflows first, then .github/workflows
for prefix in [".gitea/workflows", ".github/workflows"]:
path = f"{prefix}/{workflow}"
data = self._request(f"/repos/{owner}/{repo}/contents/{path}")
if data and "content" in data:
try:
return base64.b64decode(data["content"]).decode("utf-8")
except Exception:
pass
return None
# PR methods
def get_pr_checks(self, owner: str, repo: str, pr_number: int) -> dict | None:
"""Get CI check status for a pull request."""
# Get PR details first to find head SHA
pr = self._request(f"/repos/{owner}/{repo}/pulls/{pr_number}")
if not pr:
return None
head_sha = pr.get("head", {}).get("sha", "")
if not head_sha:
return None
# Get commit status
status = self._request(f"/repos/{owner}/{repo}/commits/{head_sha}/status")
return {
"pr": pr,
"sha": head_sha,
"status": status,
}
# Runner methods
def get_registration_token(self, owner: str = "", repo: str = "") -> str | None:
"""Get runner registration token for repo, org, or user level."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/registration-token"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/registration-token"
else:
endpoint = "/user/actions/runners/registration-token"
result = self._post(endpoint, {})
if result and "token" in result:
return result["token"]
return None
def get_runners(self, owner: str = "", repo: str = "") -> list:
"""Get runners for repo, org, or global (admin)."""
if owner and repo:
data = self._request(f"/repos/{owner}/{repo}/actions/runners")
elif owner:
data = self._request(f"/orgs/{owner}/actions/runners")
else:
data = self._request("/admin/runners")
if data and "runners" in data:
return data["runners"]
return data if isinstance(data, list) else []
def get_runner_jobs(self, owner: str = "", repo: str = "") -> list:
"""Get running/pending jobs for runners."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/jobs"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/jobs"
else:
endpoint = "/admin/runners/jobs"
data = self._request(endpoint)
if data and "jobs" in data:
return data["jobs"]
return data if isinstance(data, list) else []
def get_runner(self, runner_id: int, owner: str = "", repo: str = "") -> dict | None:
"""Get a specific runner by ID."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/{runner_id}"
else:
endpoint = f"/admin/runners/{runner_id}"
return self._request(endpoint)
def delete_runner(self, runner_id: int, owner: str = "", repo: str = "") -> bool:
"""Delete a runner by ID."""
if owner and repo:
endpoint = f"/repos/{owner}/{repo}/actions/runners/{runner_id}"
elif owner:
endpoint = f"/orgs/{owner}/actions/runners/{runner_id}"
else:
endpoint = f"/admin/runners/{runner_id}"
return self._delete(endpoint)
# Version/Info methods
def get_version(self) -> dict | None:
"""Get Gitea server version info."""
return self._request("/version")

349
src/gitea_ci/cli.py Normal file
View File

@@ -0,0 +1,349 @@
"""Command-line interface for gitea-ci."""
import argparse
import os
import re
import sys
from .config import Config
from .api import GiteaAPI
from .commands import (
cmd_list, cmd_status, cmd_logs, cmd_watch, cmd_delete, cmd_discover,
cmd_trigger, cmd_rerun, cmd_cancel, cmd_validate, cmd_workflows,
cmd_artifacts,
cmd_runners, cmd_register_token,
cmd_stats, cmd_pr, cmd_compare, cmd_infra, cmd_config, cmd_repo,
)
def main() -> int:
"""Main entry point."""
# Quick check for direct run URL or instance URL as first arg
if len(sys.argv) > 1 and not sys.argv[1].startswith("-"):
arg = sys.argv[1]
# Check if it looks like a URL or hostname (not a command)
commands = ("list", "ls", "l", "status", "s", "logs", "log", "watch", "w", "config",
"trigger", "t", "rerun", "cancel", "artifacts", "art", "pr")
if "." in arg and arg not in commands:
# Check for direct run URL: git.mymx.me/user/repo/actions/runs/123
run_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)/actions/runs/(\d+)", arg)
if run_match:
host, owner, repo, run_id = run_match.groups()
url = f"https://{host}"
os.environ["GITEA_URL"] = url
# Rewrite args to: status -o owner -r repo -R run_id
sys.argv = [sys.argv[0], "status", "-o", owner, "-r", repo, "-R", run_id]
else:
# Check for repo URL: git.mymx.me/user/repo/actions
repo_match = re.search(r"(?:https?://)?([^/]+)/([^/]+)/([^/]+)(?:/actions)?$", arg)
if repo_match:
host, owner, repo = repo_match.groups()
url = f"https://{host}"
os.environ["GITEA_URL"] = url
sys.argv = [sys.argv[0], "list", f"{owner}/{repo}"]
else:
# Just an instance URL
url = arg if arg.startswith("http") else f"https://{arg}"
os.environ["GITEA_URL"] = url
sys.argv.pop(1) # Remove URL from args
parser = argparse.ArgumentParser(
description="Monitor and manage Gitea Actions workflows",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Commands:
list List recent workflow runs (default: discover all)
stats Show workflow statistics with visual graphs
status Show detailed run status with jobs
logs Show job logs
watch Watch running workflows in real-time
trigger Trigger a workflow manually (workflow_dispatch)
rerun Re-run a workflow or failed jobs
cancel Cancel a running workflow
delete Delete a workflow run
artifacts List or download build artifacts
pr Show CI status for a pull request
compare Compare two workflow runs (diff jobs)
runners List runners and their status
register-token Get runner registration token
workflows List workflows in a repository
infra Show infrastructure status
config Configure token and defaults
Environment:
GITEA_URL Gitea instance URL
GITEA_TOKEN API token for authentication (required for write operations)
Examples:
gitea-ci git.mymx.me Discover all workflows on instance
gitea-ci git.mymx.me/user/repo List runs for specific repo
gitea-ci list user/repo --failed Show only failed runs
gitea-ci list user/repo --json Output as JSON
gitea-ci stats user/repo Show statistics with graphs
gitea-ci stats user/repo --detailed Include per-job breakdown
gitea-ci status user/repo Show latest run status
gitea-ci status user/repo 123 Show specific run details
gitea-ci status user/repo 123 --json Output run details as JSON
gitea-ci watch user/repo Watch latest run
gitea-ci watch user/repo 123 -t 300 Watch specific run with timeout
gitea-ci logs user/repo 123 -j 456 Show job logs
gitea-ci trigger user/repo -w lint.yml Trigger workflow manually
gitea-ci rerun user/repo --failed Re-run only failed jobs
gitea-ci cancel user/repo Cancel latest running workflow
gitea-ci artifacts user/repo -d 42 Download artifact #42
gitea-ci pr user/repo -p 123 Show CI status for PR #123
gitea-ci compare user/repo 175 174 Compare two runs
gitea-ci delete user/repo -R 123 Delete workflow run #123
gitea-ci runners List runners (user-level)
gitea-ci runners -o org List organization runners
gitea-ci runners info 5 Show runner #5 details
gitea-ci runners delete 5 -f Delete runner #5 (no confirmation)
gitea-ci runners jobs Show running/pending jobs
gitea-ci register-token Get user-level registration token
gitea-ci register-token -o org -r repo Get repo-level registration token
gitea-ci workflows user/repo -v List workflows with triggers
gitea-ci config --test Test API connectivity
gitea-ci config token Show token info
gitea-ci config token check Check token permissions
gitea-ci config token set <token> Set API token
gitea-ci validate Validate local workflow files
gitea-ci validate --check-runners Cross-reference runs-on labels
gitea-ci validate --strict Treat warnings as errors
""",
)
parser.add_argument("-u", "--url", help="Gitea URL")
subparsers = parser.add_subparsers(dest="command", metavar="command")
# Common args for repo-based commands
def add_repo_args(p):
p.add_argument("-o", "--owner", help="Repository owner")
p.add_argument("-r", "--repo", help="Repository name")
p.add_argument("repo_path", nargs="?", help="Repository as owner/repo")
# list
list_p = subparsers.add_parser("list", aliases=["ls", "l"], help="List workflow runs")
add_repo_args(list_p)
list_p.add_argument("-n", "--limit", type=int, default=10, help="Number of runs")
list_p.add_argument("--failed", dest="status_filter", action="store_const", const="failed", help="Show only failed runs")
list_p.add_argument("--success", dest="status_filter", action="store_const", const="success", help="Show only successful runs")
list_p.add_argument("--running", dest="status_filter", action="store_const", const="running", help="Show only running/waiting")
list_p.add_argument("-b", "--branch", help="Filter by branch name")
list_p.add_argument("-w", "--workflow", help="Filter by workflow file")
list_p.add_argument("--json", action="store_true", help="Output as JSON")
list_p.add_argument("--compact", action="store_true", help="Compact single-line output")
# stats
stats_p = subparsers.add_parser("stats", help="Show workflow statistics with graphs")
add_repo_args(stats_p)
stats_p.add_argument("-n", "--limit", type=int, default=50, help="Number of runs to analyze")
stats_p.add_argument("--detailed", action="store_true", help="Include per-job statistics")
stats_p.add_argument("--json", action="store_true", help="Output as JSON")
# status (supports repo_path and/or run_id)
status_p = subparsers.add_parser("status", aliases=["s"], help="Show run status")
status_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
status_p.add_argument("-o", "--owner", help="Repository owner")
status_p.add_argument("-r", "--repo", help="Repository name")
status_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID")
status_p.add_argument("--json", action="store_true", help="Output as JSON")
# logs (supports repo_path and/or run_id)
logs_p = subparsers.add_parser("logs", aliases=["log"], help="Show job logs")
logs_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
logs_p.add_argument("-o", "--owner", help="Repository owner")
logs_p.add_argument("-r", "--repo", help="Repository name")
logs_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID")
logs_p.add_argument("-j", "--job", dest="job_id", type=int, help="Job ID")
# watch (supports repo_path and/or run_id)
watch_p = subparsers.add_parser("watch", aliases=["w"], help="Watch runs")
watch_p.add_argument("positional", nargs="*", help="[owner/repo] [run_id]")
watch_p.add_argument("-o", "--owner", help="Repository owner")
watch_p.add_argument("-r", "--repo", help="Repository name")
watch_p.add_argument("-R", "--run", dest="run_id", type=int, help="Run ID to watch")
watch_p.add_argument("-i", "--interval", type=int, default=5, help="Refresh interval (seconds)")
watch_p.add_argument("-t", "--timeout", type=int, default=0, help="Auto-exit after N seconds (0=disabled)")
watch_p.add_argument("--no-clear", action="store_true", help="Don't clear screen")
watch_p.add_argument("--notify", action="store_true", help="Desktop notification on completion")
# config
config_p = subparsers.add_parser("config", help="Configure gitea-ci")
config_p.add_argument("action", nargs="?", choices=["token"], help="Subcommand")
config_p.add_argument("token_action", nargs="?", choices=["check", "set"], help="Token action")
config_p.add_argument("token_value", nargs="?", help="Token value (for 'set')")
config_p.add_argument("--show", action="store_true", help="Show current config")
config_p.add_argument("--test", action="store_true", help="Test API connectivity")
# repo
repo_p = subparsers.add_parser("repo", help="Manage repositories")
repo_p.add_argument("action", nargs="?", choices=["list", "create", "exists", "check"], default="list",
help="Action: list, create, exists, check")
repo_p.add_argument("name", nargs="?", help="Repository name (for create/exists)")
repo_p.add_argument("-d", "--description", help="Repository description (for create)")
repo_p.add_argument("-p", "--private", action="store_true", help="Make repository private")
add_repo_args(repo_p)
# trigger
trigger_p = subparsers.add_parser("trigger", aliases=["t"], help="Trigger workflow")
add_repo_args(trigger_p)
trigger_p.add_argument("-w", "--workflow", help="Workflow file (e.g., lint.yml)")
trigger_p.add_argument("-b", "--ref", help="Branch/tag/SHA to run on (default: master)")
trigger_p.add_argument("-i", "--input", action="append", help="Input as key=value (repeatable)")
# rerun
rerun_p = subparsers.add_parser("rerun", help="Re-run workflow/jobs")
add_repo_args(rerun_p)
rerun_p.add_argument("-R", "--run-id", type=int, help="Run ID to re-run")
rerun_p.add_argument("-j", "--job-id", type=int, help="Specific job ID to re-run")
rerun_p.add_argument("-f", "--failed-only", action="store_true", help="Re-run only failed jobs")
# cancel
cancel_p = subparsers.add_parser("cancel", help="Cancel running workflow")
add_repo_args(cancel_p)
cancel_p.add_argument("-R", "--run-id", type=int, help="Run ID to cancel")
# delete
delete_p = subparsers.add_parser("delete", aliases=["rm"], help="Delete a workflow run")
add_repo_args(delete_p)
delete_p.add_argument("-R", "--run-id", type=int, required=True, help="Run ID to delete")
delete_p.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# artifacts
artifacts_p = subparsers.add_parser("artifacts", aliases=["art"], help="List/download artifacts")
add_repo_args(artifacts_p)
artifacts_p.add_argument("-R", "--run-id", type=int, help="Filter by run ID")
artifacts_p.add_argument("-d", "--download", type=int, metavar="ID", help="Download artifact by ID")
artifacts_p.add_argument("-O", "--output", help="Output filename for download")
# pr
pr_p = subparsers.add_parser("pr", help="Show PR CI status")
add_repo_args(pr_p)
pr_p.add_argument("-p", "--pr-number", type=int, help="Pull request number")
# compare
compare_p = subparsers.add_parser("compare", aliases=["diff"], help="Compare two runs")
add_repo_args(compare_p)
compare_p.add_argument("run1", type=int, help="First run ID")
compare_p.add_argument("run2", type=int, nargs="?", help="Second run ID (default: previous run)")
compare_p.add_argument("--json", action="store_true", help="Output as JSON")
# infra
infra_p = subparsers.add_parser("infra", help="Show infrastructure status")
add_repo_args(infra_p)
infra_p.add_argument("--jobs", action="store_true", help="Show pending/running jobs")
# runners
runners_p = subparsers.add_parser("runners", help="Manage runners")
runners_p.add_argument("action", nargs="?", default="list",
choices=["list", "delete", "info", "jobs"],
help="Action: list, delete, info, jobs")
runners_p.add_argument("runner_id", nargs="?", type=int, help="Runner ID (for delete/info)")
add_repo_args(runners_p)
runners_p.add_argument("--force", "-f", action="store_true", help="Skip confirmation for delete")
runners_p.add_argument("--json", action="store_true", help="Output as JSON")
# register-token
regtoken_p = subparsers.add_parser("register-token", aliases=["regtoken"], help="Get runner registration token")
add_repo_args(regtoken_p)
regtoken_p.add_argument("--user", action="store_true", help="Get user-level token (ignores repo defaults)")
# workflows
workflows_p = subparsers.add_parser("workflows", aliases=["wf"], help="List workflows")
add_repo_args(workflows_p)
workflows_p.add_argument("-v", "--verbose", action="store_true", help="Show workflow triggers and details")
workflows_p.add_argument("--json", action="store_true", help="Output as JSON")
# validate
validate_p = subparsers.add_parser("validate", aliases=["lint"], help="Validate local workflow files")
validate_p.add_argument("path", nargs="?", help="Path to workflow dir or file")
add_repo_args(validate_p)
validate_p.add_argument("--check-runners", action="store_true", help="Cross-reference runs-on with available runners")
validate_p.add_argument("--strict", action="store_true", help="Treat warnings as errors")
args = parser.parse_args()
# Load config
config = Config.load()
if args.url:
config.url = args.url
api = GiteaAPI(config)
# Set defaults for optional args
if not hasattr(args, "owner"):
args.owner = None
if not hasattr(args, "repo"):
args.repo = None
if not hasattr(args, "limit"):
args.limit = 10
# Parse repo_path (owner/repo format)
if hasattr(args, "repo_path") and args.repo_path and "/" in args.repo_path:
parts = args.repo_path.split("/", 1)
if not args.owner:
args.owner = parts[0]
if not args.repo:
args.repo = parts[1]
# Parse positional args for status/logs/watch: [owner/repo] [run_id]
if hasattr(args, "positional") and args.positional:
for pos in args.positional:
if "/" in pos:
# Looks like owner/repo
parts = pos.split("/", 1)
if not args.owner:
args.owner = parts[0]
if not args.repo:
args.repo = parts[1]
elif pos.isdigit():
# Looks like run_id
if not args.run_id:
args.run_id = int(pos)
# Dispatch commands
if args.command in (None, "list", "ls", "l"):
return cmd_list(api, args)
elif args.command == "stats":
return cmd_stats(api, args)
elif args.command in ("status", "s"):
return cmd_status(api, args)
elif args.command in ("logs", "log"):
return cmd_logs(api, args)
elif args.command in ("watch", "w"):
return cmd_watch(api, args)
elif args.command == "config":
return cmd_config(api, args)
elif args.command == "repo":
return cmd_repo(api, args)
elif args.command in ("trigger", "t"):
return cmd_trigger(api, args)
elif args.command == "rerun":
return cmd_rerun(api, args)
elif args.command == "cancel":
return cmd_cancel(api, args)
elif args.command in ("delete", "rm"):
return cmd_delete(api, args)
elif args.command in ("artifacts", "art"):
return cmd_artifacts(api, args)
elif args.command == "pr":
return cmd_pr(api, args)
elif args.command in ("compare", "diff"):
return cmd_compare(api, args)
elif args.command == "infra":
return cmd_infra(api, args)
elif args.command == "runners":
return cmd_runners(api, args)
elif args.command in ("register-token", "regtoken"):
return cmd_register_token(api, args)
elif args.command in ("workflows", "wf"):
return cmd_workflows(api, args)
elif args.command in ("validate", "lint"):
return cmd_validate(api, args)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,35 @@
"""Command modules for gitea-ci."""
from .runs import cmd_list, cmd_status, cmd_logs, cmd_watch, cmd_delete, cmd_discover
from .workflows import cmd_trigger, cmd_rerun, cmd_cancel, cmd_validate, cmd_workflows
from .artifacts import cmd_artifacts
from .runners import cmd_runners, cmd_register_token
from .inspect import cmd_stats, cmd_pr, cmd_compare, cmd_infra, cmd_config, cmd_repo
__all__ = [
# runs
"cmd_list",
"cmd_status",
"cmd_logs",
"cmd_watch",
"cmd_delete",
"cmd_discover",
# workflows
"cmd_trigger",
"cmd_rerun",
"cmd_cancel",
"cmd_validate",
"cmd_workflows",
# artifacts
"cmd_artifacts",
# runners
"cmd_runners",
"cmd_register_token",
# inspect
"cmd_stats",
"cmd_pr",
"cmd_compare",
"cmd_infra",
"cmd_config",
"cmd_repo",
]

View File

@@ -0,0 +1,62 @@
"""Artifact-related commands."""
import argparse
import sys
from pathlib import Path
from ..api import GiteaAPI
from ..formatters import format_time
# Import style module
try:
from style import Color
except ImportError:
import os
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
from style import Color
def cmd_artifacts(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List or download artifacts."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
# Download specific artifact
if args.download:
artifact_id = args.download
output = args.output or f"artifact-{artifact_id}.zip"
print(f"{Color.BLD}Downloading artifact #{artifact_id}...{Color.RST}")
if api.download_artifact(owner, repo, artifact_id, output):
size = Path(output).stat().st_size
print(f"{Color.GRN}✓ Downloaded to {output} ({size:,} bytes){Color.RST}")
return 0
return 1
# List artifacts
run_id = args.run_id
artifacts = api.get_artifacts(owner, repo, run_id)
if not artifacts:
print(f"{Color.DIM}No artifacts found{Color.RST}")
return 0
print(f"{Color.BLD}Artifacts:{Color.RST}\n")
for art in artifacts:
art_id = art.get("id", 0)
name = art.get("name", "unknown")
size = art.get("size_in_bytes", 0)
expired = art.get("expired", False)
created = format_time(art.get("created_at", ""))
status_icon = f"{Color.DIM}(expired){Color.RST}" if expired else ""
print(f" {Color.BLD}#{art_id}{Color.RST} {name} {status_icon}")
print(f" {Color.DIM}Size: {size:,} bytes · Created: {created}{Color.RST}")
print()
print(f"{Color.DIM}Download: gitea-ci artifacts -d <ID> [-O filename.zip]{Color.RST}")
return 0

View File

@@ -0,0 +1,842 @@
"""Inspection and configuration commands: stats, pr, compare, infra, config, repo."""
import argparse
import json
import re
import sys
import urllib.request
from datetime import datetime
from typing import Any
from ..api import GiteaAPI
from ..config import CONFIG_FILE
from ..formatters import (
format_time, format_duration, format_seconds, status_display,
print_rate_bar, output_stats_json,
)
from ..utils import parse_duration_seconds
# Import style module
try:
from style import Color
except ImportError:
import os
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
from style import Color
def cmd_stats(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show comprehensive workflow statistics with visual graphs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.BLD}Error:{Color.RST} Repository required")
return 1
# Fetch more runs for better statistics
limit = getattr(args, "limit", 50) or 50
runs = api.get_runs(owner, repo, limit=limit)
if not runs:
print(f"{Color.DIM}No workflow runs found{Color.RST}")
return 0
# JSON output
if getattr(args, "json", False):
return output_stats_json(runs, owner, repo)
print(f"{Color.BLD}Workflow Statistics: {owner}/{repo}{Color.RST}")
print("" * 60)
# Calculate statistics
total = len(runs)
success = [r for r in runs if r.get("conclusion") == "success"]
failed = [r for r in runs if r.get("conclusion") == "failure"]
cancelled = [r for r in runs if r.get("conclusion") == "cancelled"]
running = [r for r in runs if r.get("status") in ("running", "waiting")]
completed = total - len(running)
# Summary section
print(f"{Color.BLD}Summary{Color.RST} {Color.DIM}(last {total} runs){Color.RST}")
print("" * 40)
success_rate = (len(success) / completed * 100) if completed > 0 else 0
print(f" {'Total:':<12} {total}")
print(f" {'Completed:':<12} {completed}")
print(f" {'✓ Success:':<12} {len(success)}")
print(f" {'✗ Failed:':<12} {len(failed)}")
print(f" {'○ Cancelled:':<12} {len(cancelled)}")
print(f" {'● Running:':<12} {len(running)}")
print()
# Success rate bar
print(f"{Color.BLD}Success Rate{Color.RST}")
print("" * 40)
print_rate_bar("Overall", success_rate, 30)
print()
# Per-workflow breakdown
workflows: dict[str, dict] = {}
for run in runs:
wf = run.get("workflow") or run.get("name") or "unknown"
if wf not in workflows:
workflows[wf] = {"total": 0, "success": 0, "failed": 0, "durations": []}
workflows[wf]["total"] += 1
if run.get("conclusion") == "success":
workflows[wf]["success"] += 1
elif run.get("conclusion") == "failure":
workflows[wf]["failed"] += 1
# Track duration
duration = parse_duration_seconds(run)
if duration > 0:
workflows[wf]["durations"].append(duration)
if len(workflows) > 1:
print(f"{Color.BLD}Per-Workflow Breakdown{Color.RST}")
print("" * 40)
for wf_name, stats in sorted(workflows.items()):
wf_total = stats["total"]
wf_success = stats["success"]
wf_rate = (wf_success / wf_total * 100) if wf_total > 0 else 0
# Truncate workflow name
display_name = wf_name[:20] if len(wf_name) > 20 else wf_name
print_rate_bar(display_name, wf_rate, 20, show_count=f"{wf_success}/{wf_total}")
print()
# Duration statistics
all_durations = []
for run in runs:
duration = parse_duration_seconds(run)
if duration > 0:
all_durations.append(duration)
if all_durations:
print(f"{Color.BLD}Duration Statistics{Color.RST}")
print("" * 40)
avg_duration = sum(all_durations) / len(all_durations)
min_duration = min(all_durations)
max_duration = max(all_durations)
print(f" {'Average:':<12} {format_seconds(int(avg_duration))}")
print(f" {'Fastest:':<12} {format_seconds(min_duration)}")
print(f" {'Slowest:':<12} {format_seconds(max_duration)}")
print()
# Recent trend (last 10 runs)
recent = runs[:min(10, len(runs))]
if recent:
print(f"{Color.BLD}Recent Trend{Color.RST} {Color.DIM}(last {len(recent)} runs){Color.RST}")
print("" * 40)
trend_line = " "
for run in reversed(recent): # Oldest to newest
conclusion = run.get("conclusion", "")
if conclusion == "success":
trend_line += "" # Full block for success
elif conclusion == "failure":
trend_line += "" # Low block for failure (bar chart effect)
elif run.get("status") in ("running", "waiting"):
trend_line += "" # Medium shade for running
else:
trend_line += "" # Light shade for other
print(trend_line)
print(f" {Color.DIM}█=pass ▁=fail ▒=running ░=other{Color.RST}")
print(f" {Color.DIM}← older newer →{Color.RST}")
print()
# Job-level statistics (if detailed flag)
if getattr(args, "detailed", False):
_print_job_stats(api, runs[:20], owner, repo)
return 0
def _print_job_stats(api: GiteaAPI, runs: list, owner: str, repo: str) -> None:
"""Print job-level statistics."""
print(f"{Color.BLD}Job Statistics{Color.RST} {Color.DIM}(from {len(runs)} recent runs){Color.RST}")
print("" * 40)
job_stats: dict[str, dict] = {}
for run in runs:
if run.get("status") != "completed":
continue
run_id = run.get("id")
jobs = api.get_jobs(owner, repo, run_id)
for job in jobs:
name = job.get("name", "unknown")
if name not in job_stats:
job_stats[name] = {"success": 0, "failed": 0, "durations": []}
if job.get("conclusion") == "success":
job_stats[name]["success"] += 1
elif job.get("conclusion") == "failure":
job_stats[name]["failed"] += 1
# Parse job duration
started = job.get("started_at", "")
ended = job.get("completed_at", "")
if started and ended:
try:
start_dt = datetime.fromisoformat(started.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00"))
duration = int((end_dt - start_dt).total_seconds())
# Sanity check: duration should be reasonable (< 24 hours)
if 0 < duration < 86400:
job_stats[name]["durations"].append(duration)
except (ValueError, TypeError):
pass
if not job_stats:
print(f" {Color.DIM}No job data available{Color.RST}")
return
# Print job table
print(f" {'Job':<30} {'✓ Pass':<8} {'✗ Fail':<8} {'Avg':<8}")
print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*8}")
for job_name, stats in sorted(job_stats.items()):
total = stats["success"] + stats["failed"]
if total == 0:
continue
name_display = job_name[:30] if len(job_name) > 30 else job_name
avg_dur = ""
if stats["durations"]:
avg = sum(stats["durations"]) / len(stats["durations"])
avg_dur = format_seconds(int(avg))
fail_str = str(stats["failed"]) if stats["failed"] > 0 else f"{Color.DIM}0{Color.RST}"
print(f" {name_display:<30} {stats['success']:<8} {fail_str:<8} {avg_dur:<8}")
print()
def cmd_pr(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show CI status for a pull request."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
pr_number = args.pr_number
if not pr_number:
print(f"{Color.RED}PR number required (-p/--pr){Color.RST}")
return 1
data = api.get_pr_checks(owner, repo, pr_number)
if not data:
print(f"{Color.RED}PR #{pr_number} not found{Color.RST}")
return 1
pr = data["pr"]
sha = data["sha"][:8]
status = data.get("status", {})
# PR info
title = pr.get("title", "")
state = pr.get("state", "")
mergeable = pr.get("mergeable", False)
user = pr.get("user", {}).get("login", "")
print(f"{Color.BLD}PR #{pr_number}: {title}{Color.RST}")
print(f" {Color.DIM}Author:{Color.RST} {user}")
print(f" {Color.DIM}State:{Color.RST} {state}")
print(f" {Color.DIM}HEAD:{Color.RST} {sha}")
print(f" {Color.DIM}Mergeable:{Color.RST} {'Yes' if mergeable else 'No'}")
print()
# CI status
if status:
overall = status.get("state", "pending")
statuses = status.get("statuses", [])
# Map state to display
state_display = {
"success": f"{Color.GRN}✓ All checks passed{Color.RST}",
"pending": f"{Color.YLW}● Checks in progress{Color.RST}",
"failure": f"{Color.RED}✗ Some checks failed{Color.RST}",
"error": f"{Color.RED}✗ Check errors{Color.RST}",
}
print(f"{Color.BLD}CI Status:{Color.RST} {state_display.get(overall, overall)}\n")
if statuses:
for s in statuses:
ctx = s.get("context", "")
state = s.get("state", "")
desc = s.get("description", "")[:50]
icon = status_display(state, state)
print(f" {icon} {ctx}")
if desc:
print(f" {Color.DIM}{desc}{Color.RST}")
else:
# Try to get workflow runs for this commit
runs = api.get_runs(owner, repo, limit=5)
commit_runs = [r for r in runs if r.get("head_sha", "").startswith(sha)]
if commit_runs:
print(f" {Color.DIM}Workflow runs for this commit:{Color.RST}")
for run in commit_runs:
run_status = status_display(run.get("status", ""), run.get("conclusion", ""))
run_title = run.get("display_title", "")[:40]
print(f" {run_status} {run_title}")
else:
print(f"{Color.DIM}No CI status found{Color.RST}")
return 0
def cmd_compare(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Compare two workflow runs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run1_id = args.run1
run2_id = args.run2
# If only one run specified, compare with previous
if not run2_id:
runs = api.get_runs(owner, repo, limit=20)
found_idx = None
for i, r in enumerate(runs):
if r.get("id") == run1_id:
found_idx = i
break
if found_idx is not None and found_idx + 1 < len(runs):
run2_id = runs[found_idx + 1].get("id")
else:
print(f"{Color.RED}Cannot find previous run to compare{Color.RST}")
return 1
# Fetch both runs and their jobs
run1 = api.get_run(owner, repo, run1_id)
run2 = api.get_run(owner, repo, run2_id)
if not run1 or not run2:
print(f"{Color.RED}Could not fetch one or both runs{Color.RST}")
return 1
jobs1 = api.get_jobs(owner, repo, run1_id)
jobs2 = api.get_jobs(owner, repo, run2_id)
# JSON output
if getattr(args, "json", False):
job_changes: list[dict[str, Any]] = []
output: dict[str, Any] = {
"repository": f"{owner}/{repo}",
"run1": {"id": run1_id, "status": run1.get("status"), "conclusion": run1.get("conclusion")},
"run2": {"id": run2_id, "status": run2.get("status"), "conclusion": run2.get("conclusion")},
"job_changes": job_changes,
}
jobs1_map = {j.get("name"): j for j in jobs1}
jobs2_map = {j.get("name"): j for j in jobs2}
all_job_names = set(jobs1_map.keys()) | set(jobs2_map.keys())
for name in sorted(all_job_names):
j1 = jobs1_map.get(name, {})
j2 = jobs2_map.get(name, {})
job_changes.append({
"name": name,
"run1_conclusion": j1.get("conclusion"),
"run2_conclusion": j2.get("conclusion"),
"changed": j1.get("conclusion") != j2.get("conclusion"),
})
print(json.dumps(output, indent=2))
return 0
# Pretty output
print(f"{Color.BLD}Comparing runs:{Color.RST}")
print(f" Run #{run1_id}: {status_display(run1.get('status', ''), run1.get('conclusion', ''))} "
f"{run1.get('display_title', '')[:40]}")
print(f" Run #{run2_id}: {status_display(run2.get('status', ''), run2.get('conclusion', ''))} "
f"{run2.get('display_title', '')[:40]}")
print()
# Compare jobs
jobs1_map = {j.get("name"): j for j in jobs1}
jobs2_map = {j.get("name"): j for j in jobs2}
all_job_names_sorted = sorted(set(jobs1_map.keys()) | set(jobs2_map.keys()))
changes = []
unchanged = []
for name in all_job_names_sorted:
j1 = jobs1_map.get(name, {})
j2 = jobs2_map.get(name, {})
c1 = j1.get("conclusion", "missing")
c2 = j2.get("conclusion", "missing")
if c1 != c2:
changes.append((name, c1, c2))
else:
unchanged.append((name, c1))
if changes:
print(f"{Color.BLD}Changed jobs:{Color.RST}\n")
for name, c1, c2 in changes:
s1 = status_display("completed", c1)
s2 = status_display("completed", c2)
print(f" {name}")
print(f" {s1} #{run1_id}{s2} #{run2_id}")
print()
if unchanged and not changes:
print(f"{Color.DIM}All {len(unchanged)} jobs have same status{Color.RST}")
elif unchanged:
print(f"{Color.DIM}Unchanged: {len(unchanged)} jobs{Color.RST}")
return 0
def cmd_infra(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show infrastructure status: Gitea version, runners, workflows."""
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
print(f"{Color.BLD}Infrastructure Status{Color.RST}\n")
print(f"{Color.DIM}Instance:{Color.RST} {api.config.url}")
# Gitea version
version = api.get_version()
if version:
print(f"{Color.DIM}Version:{Color.RST} Gitea {version.get('version', 'unknown')}")
print()
# Try to get runners at different levels
runners_found = False
# Repository runners
if owner and repo:
print(f"{Color.BLD}Repository: {owner}/{repo}{Color.RST}")
runners = api.get_runners(owner, repo)
if runners:
runners_found = True
print(f"\n{Color.BLD}Repository Runners:{Color.RST}")
for r in runners:
status = "online" if r.get("status") == "online" else "offline"
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
labels = ", ".join(r.get("labels", [])[:3]) or "no labels"
print(f" {color}{Color.RST} {name} ({labels})")
# Workflows
workflows = api.get_workflows(owner, repo)
if workflows:
print(f"\n{Color.BLD}Workflows:{Color.RST}")
for wf in workflows:
name = wf.get("name", "")
path = wf.get("path", "").split("/")[-1]
state = wf.get("state", "")
if state == "active":
print(f" {Color.GRN}{Color.RST} {path} - {name}")
else:
print(f" {Color.DIM}{Color.RST} {path} - {name} ({state})")
# Recent runs summary
runs = api.get_runs(owner, repo, limit=10)
if runs:
success = sum(1 for r in runs if r.get("conclusion") == "success")
failed = sum(1 for r in runs if r.get("conclusion") == "failure")
running = sum(1 for r in runs if r.get("status") in ("running", "waiting", "queued"))
print(f"\n{Color.BLD}Recent Runs (last 10):{Color.RST}")
print(f" {Color.GRN}{Color.RST} {success} success {Color.RED}{Color.RST} {failed} failed {Color.BLU}{Color.RST} {running} active")
# Organization runners (if owner specified)
if owner and not repo:
print(f"{Color.BLD}Organization: {owner}{Color.RST}")
runners = api.get_runners(owner)
if runners:
runners_found = True
print(f"\n{Color.BLD}Organization Runners:{Color.RST}")
for r in runners:
status = "online" if r.get("status") == "online" else "offline"
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
print(f" {color}{Color.RST} {name}")
# Global runners (admin)
if not owner:
runners = api.get_runners()
if runners:
runners_found = True
print(f"{Color.BLD}Global Runners:{Color.RST}")
for r in runners:
status = r.get("status", "unknown")
color = Color.GRN if status == "online" else Color.RED
name = r.get("name", "unnamed")
labels = ", ".join(r.get("labels", [])[:3]) or "no labels"
busy = r.get("busy", False)
busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else ""
print(f" {color}{Color.RST} {name} ({labels}){busy_str}")
elif not runners_found:
print(f"{Color.DIM}No runners found (may require admin access){Color.RST}")
# Pending jobs
if getattr(args, "jobs", False):
jobs = api.get_runner_jobs(owner or "", repo or "")
if jobs:
print(f"\n{Color.BLD}Pending/Running Jobs:{Color.RST}")
for job in jobs[:10]:
status = job.get("status", "")
name = job.get("name", "")
repo_name = job.get("repository", {}).get("full_name", "")
print(f" {Color.BLU}{Color.RST} {name} ({repo_name})")
return 0
def _check_token_permissions(api: GiteaAPI) -> int:
"""Check what permissions the current token has."""
config = api.config
if not config.token:
print(f"{Color.RED}No token configured{Color.RST}")
print(f"{Color.DIM}Set with: gitea-ci config token set <token>{Color.RST}")
return 1
print(f"{Color.BLD}Token Permissions{Color.RST}")
print("" * 35)
# Basic auth - GET /user
user = api._request("/user")
if user:
username = user.get("login", "unknown")
print(f" {Color.GRN}{Color.RST} Authenticated as: {username}")
else:
print(f" {Color.RED}{Color.RST} Authentication failed")
return 1
# Read repos - GET /user/repos
repos = api._request("/user/repos?limit=1")
if repos is not None:
print(f" {Color.GRN}{Color.RST} Read repositories")
else:
print(f" {Color.RED}{Color.RST} Read repositories")
# Organization access - GET /user/orgs
orgs = api._request("/user/orgs")
if orgs is not None:
org_count = len(orgs) if isinstance(orgs, list) else 0
print(f" {Color.GRN}{Color.RST} Organization access ({org_count} orgs)")
else:
print(f" {Color.RED}{Color.RST} Organization access")
# Admin access - GET /admin/runners
runners = api._request("/admin/runners")
if runners is not None:
print(f" {Color.GRN}{Color.RST} Admin access")
else:
print(f" {Color.DIM}{Color.RST} Admin access (not admin or scope missing)")
# User-level runners
user_runners = api._request("/user/actions/runners")
if user_runners is not None:
print(f" {Color.GRN}{Color.RST} User-level runners")
else:
print(f" {Color.DIM}{Color.RST} User-level runners")
print()
print(f"{Color.DIM}Token ends with: ...{config.token[-4:]}{Color.RST}")
print(f"{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}")
return 0
def cmd_config(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Configure gitea-ci."""
config = api.config
# Handle token subcommand
action = getattr(args, "action", None)
if action == "token":
token_action = getattr(args, "token_action", None)
if token_action == "check":
return _check_token_permissions(api)
elif token_action == "set":
new_token = getattr(args, "token_value", None)
if not new_token:
print(f"{Color.RED}Token value required{Color.RST}")
return 1
config.token = new_token
config.save()
print(f"{Color.GRN}Token saved to {CONFIG_FILE}{Color.RST}")
return 0
else:
# Just show token info
if config.token:
print(f"{Color.BLD}Token:{Color.RST} {'*' * 8}...{config.token[-4:]}")
print(f"{Color.DIM}Use 'gitea-ci config token check' to test permissions{Color.RST}")
else:
print(f"{Color.YLW}No token configured{Color.RST}")
print(f"{Color.DIM}Set with: gitea-ci config token set <token>{Color.RST}")
return 0
if args.show:
print(f"{Color.BLD}Current configuration:{Color.RST}")
print(f" URL: {config.url}")
print(f" Token: {'***' if config.token else '(not set)'}")
print(f" Default owner: {config.default_owner or '(not set)'}")
print(f" Default repo: {config.default_repo or '(not set)'}")
print(f"\n{Color.DIM}Config file: {CONFIG_FILE}{Color.RST}")
return 0
if args.test:
print(f"{Color.BLD}Testing API connectivity...{Color.RST}\n")
print(f" URL: {config.url}")
print(f" Token: {'configured' if config.token else 'not set'}")
print()
# Test basic connectivity
try:
req = urllib.request.Request(
f"{config.url}/api/v1/version",
headers={"Accept": "application/json"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read().decode())
version = data.get("version", "unknown")
print(f" {Color.GRN}{Color.RST} Server reachable (Gitea {version})")
except urllib.error.URLError as e:
print(f" {Color.RED}{Color.RST} Connection failed: {e.reason}")
return 1
# Test authentication if token set
if config.token:
user = api._request("/user")
if user:
username = user.get("login", "unknown")
print(f" {Color.GRN}{Color.RST} Authenticated as: {username}")
else:
print(f" {Color.RED}{Color.RST} Authentication failed (invalid token?)")
return 1
else:
print(f" {Color.YLW}{Color.RST} No token configured (read-only mode)")
# Test default repo if configured
if config.default_owner and config.default_repo:
runs = api.get_runs(config.default_owner, config.default_repo, limit=1)
if runs is not None:
print(f" {Color.GRN}{Color.RST} Default repo accessible: {config.default_owner}/{config.default_repo}")
else:
print(f" {Color.YLW}{Color.RST} Default repo not accessible (may not have actions)")
print(f"\n{Color.GRN}API connectivity OK{Color.RST}")
return 0
# Interactive config
print(f"{Color.BLD}Gitea CI Configuration{Color.RST}\n")
url = input(f"Gitea URL [{config.url}]: ").strip()
if url:
config.url = url
token = input(f"API Token [{'***' if config.token else 'none'}]: ").strip()
if token:
config.token = token
owner = input(f"Default owner [{config.default_owner}]: ").strip()
if owner:
config.default_owner = owner
repo = input(f"Default repo [{config.default_repo}]: ").strip()
if repo:
config.default_repo = repo
config.save()
print(f"\n{Color.GRN}Configuration saved to {CONFIG_FILE}{Color.RST}")
return 0
def cmd_repo(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Manage repositories."""
action = getattr(args, "action", "list")
if action == "list":
# List user's repositories
print(f"{Color.BLD}Repositories:{Color.RST}\n")
repos = api.get_repos(limit=100)
if not repos:
print(f"{Color.DIM}No repositories found{Color.RST}")
return 0
for r in repos:
name = r.get("full_name", "")
desc = r.get("description", "")[:40] if r.get("description") else ""
private = r.get("private", False)
visibility = f"{Color.YLW}private{Color.RST}" if private else f"{Color.GRN}public{Color.RST}"
has_actions = r.get("has_actions", False)
actions_icon = f" {Color.DIM}[actions]{Color.RST}" if has_actions else ""
print(f" {name} [{visibility}]{actions_icon}")
if desc:
print(f" {Color.DIM}{desc}{Color.RST}")
return 0
elif action == "create":
name = args.name
if not name:
print(f"{Color.RED}Repository name required{Color.RST}")
return 1
description = getattr(args, "description", "") or ""
private = getattr(args, "private", False)
# Check if already exists
user = api._request("/user")
if not user:
print(f"{Color.RED}Authentication required{Color.RST}")
return 1
username = user.get("login", "")
if api.repo_exists(username, name):
print(f"{Color.YLW}Repository {username}/{name} already exists{Color.RST}")
return 0
print(f"Creating repository {Color.BLD}{name}{Color.RST}...")
result = api.create_repo(name, description, private)
if result:
full_name = result.get("full_name", name)
clone_url = result.get("ssh_url", result.get("clone_url", ""))
print(f"{Color.GRN}{Color.RST} Created: {full_name}")
if clone_url:
print(f" {Color.DIM}Clone: {clone_url}{Color.RST}")
return 0
else:
print(f"{Color.RED}{Color.RST} Failed to create repository")
return 1
elif action == "exists":
owner = args.owner or api.config.default_owner
repo = args.repo or args.name
if not owner or not repo:
print(f"{Color.RED}Repository (owner/repo) required{Color.RST}")
return 1
if api.repo_exists(owner, repo):
print(f"{Color.GRN}{Color.RST} {owner}/{repo} exists")
return 0
else:
print(f"{Color.RED}{Color.RST} {owner}/{repo} does not exist")
return 1
elif action == "check":
owner = args.owner or api.config.default_owner
repo = args.repo or args.name or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository (owner/repo) required{Color.RST}")
return 1
print(f"{Color.BLD}Repository Check: {owner}/{repo}{Color.RST}\n")
# Check if repository exists and has actions enabled
repo_info = api._request(f"/repos/{owner}/{repo}")
if not repo_info:
print(f"{Color.RED}{Color.RST} Repository not found or not accessible")
return 1
has_actions = repo_info.get("has_actions", False)
if has_actions:
print(f"{Color.GRN}{Color.RST} Actions enabled")
else:
print(f"{Color.RED}{Color.RST} Actions not enabled")
print(f" {Color.DIM}Enable in repository Settings → Actions{Color.RST}")
return 1
# Get workflows
workflows = api.get_workflows(owner, repo)
if workflows:
print(f"{Color.GRN}{Color.RST} Found {len(workflows)} workflow(s)")
else:
print(f"{Color.YLW}{Color.RST} No workflows found")
print(f" {Color.DIM}Create .gitea/workflows/*.yml or .github/workflows/*.yml{Color.RST}")
return 0
# Get available runners (user scope first)
runners = api.get_runners("", "") # User-level runners
if not runners:
# Try repo scope
runners = api.get_runners(owner, repo)
runner_labels = set()
online_labels = set()
for r in runners:
labels = r.get("labels", [])
runner_labels.update(labels)
if r.get("status") == "online":
online_labels.update(labels)
if runners:
online_count = sum(1 for r in runners if r.get("status") == "online")
print(f"{Color.GRN}{Color.RST} Runners available: {online_count} online, {len(runners)} total")
else:
print(f"{Color.RED}{Color.RST} No runners found")
print(f" {Color.DIM}Register a runner with: gitea-ci register-token{Color.RST}")
return 1
# Analyze workflows for runs-on labels
print(f"\n{Color.BLD}Workflow Analysis:{Color.RST}\n")
issues = []
for wf in workflows:
wf_name = wf.get("name", "unnamed")
wf_path = wf.get("path", "")
wf_file = wf_path.split("/")[-1] if wf_path else "unknown"
# Get workflow contents and parse runs-on
content = api.get_workflow_contents(owner, repo, wf_file)
if not content:
print(f" {Color.DIM}{Color.RST} {wf_name} {Color.DIM}(cannot read){Color.RST}")
continue
# Parse runs-on labels from YAML content
runs_on_matches = re.findall(r'runs-on:\s*([^\n]+)', content)
required_labels = set()
for match in runs_on_matches:
# Handle array syntax: [label1, label2] or simple string
match = match.strip()
if match.startswith('['):
# Array syntax
labels = re.findall(r'[\w-]+', match)
required_labels.update(labels)
else:
# Simple string (may have quotes)
label = match.strip('\'"')
required_labels.add(label)
if not required_labels:
print(f" {Color.DIM}{Color.RST} {wf_name} {Color.DIM}(no runs-on found){Color.RST}")
continue
# Check if labels match available runners
missing = required_labels - runner_labels
offline = (required_labels & runner_labels) - online_labels
if missing:
print(f" {Color.RED}{Color.RST} {wf_name}")
print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}")
print(f" {Color.RED}missing:{Color.RST} {', '.join(missing)}")
issues.append((wf_name, "missing labels", missing))
elif offline:
print(f" {Color.YLW}{Color.RST} {wf_name}")
print(f" {Color.DIM}requires:{Color.RST} {', '.join(required_labels)}")
print(f" {Color.YLW}offline:{Color.RST} {', '.join(offline)}")
issues.append((wf_name, "runners offline", offline))
else:
print(f" {Color.GRN}{Color.RST} {wf_name} {Color.DIM}({', '.join(required_labels)}){Color.RST}")
# Summary
if issues:
print(f"\n{Color.BLD}Issues Found:{Color.RST}\n")
all_missing = set()
for wf_name, issue_type, labels in issues:
if issue_type == "missing labels":
all_missing.update(labels)
if all_missing:
print(f" Missing runner labels: {', '.join(sorted(all_missing))}")
print(f" {Color.DIM}Register runners with these labels or update workflows{Color.RST}")
return 1
print(f"\n{Color.GRN}All workflows have matching runners{Color.RST}")
return 0
return 0

View File

@@ -0,0 +1,188 @@
"""Runner-related commands."""
import argparse
import json
import sys
from ..api import GiteaAPI
from ..formatters import STATUS_STYLE
# Import style module
try:
from style import Color
except ImportError:
import os
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
from style import Color
def cmd_runners(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Manage runners - list, delete, info, jobs."""
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
action = getattr(args, "action", "list")
runner_id = getattr(args, "runner_id", None)
# Determine scope string
if owner and repo:
scope = f"repository {owner}/{repo}"
elif owner:
scope = f"organization {owner}"
else:
scope = "user-level"
# Handle different actions
if action == "delete":
if not runner_id:
print(f"{Color.RED}Runner ID required: gitea-ci runners delete <id>{Color.RST}")
return 1
# Confirm unless --force
if not getattr(args, "force", False):
confirm = input(f"Delete runner #{runner_id}? [y/N]: ").strip().lower()
if confirm != "y":
print(f"{Color.DIM}Cancelled{Color.RST}")
return 0
print(f"{Color.BLD}Deleting runner #{runner_id}...{Color.RST}")
if api.delete_runner(runner_id, owner or "", repo or ""):
print(f"{Color.GRN}✓ Runner deleted{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to delete runner{Color.RST}")
print(f"{Color.DIM}(API may not be available in this Gitea version){Color.RST}")
return 1
elif action == "info":
if not runner_id:
print(f"{Color.RED}Runner ID required: gitea-ci runners info <id>{Color.RST}")
return 1
runner = api.get_runner(runner_id, owner or "", repo or "")
if not runner:
print(f"{Color.RED}Runner #{runner_id} not found{Color.RST}")
return 1
if getattr(args, "json", False):
print(json.dumps(runner, indent=2))
return 0
print(f"{Color.BLD}Runner #{runner_id}{Color.RST}")
print("" * 35)
print(f" {Color.BLD}Name:{Color.RST} {runner.get('name', 'unnamed')}")
status = runner.get("status", "unknown")
status_color = Color.GRN if status == "online" else Color.RED
print(f" {Color.BLD}Status:{Color.RST} {status_color}{status}{Color.RST}")
print(f" {Color.BLD}Busy:{Color.RST} {'Yes' if runner.get('busy') else 'No'}")
labels = runner.get("labels", [])
if labels:
print(f" {Color.BLD}Labels:{Color.RST} {', '.join(labels)}")
if runner.get("version"):
print(f" {Color.BLD}Version:{Color.RST} {runner.get('version')}")
return 0
elif action == "jobs":
jobs = api.get_runner_jobs(owner or "", repo or "")
if getattr(args, "json", False):
print(json.dumps({"scope": scope, "jobs": jobs}, indent=2))
return 0
print(f"{Color.BLD}Runner Jobs ({scope}):{Color.RST}\n")
if not jobs:
print(f"{Color.DIM}No running/pending jobs{Color.RST}")
return 0
for job in jobs:
job_id = job.get("id", 0)
name = job.get("name", "unnamed")
status = job.get("status", "unknown")
runs_on = job.get("runs_on", [])
color, symbol = STATUS_STYLE.get(status, (Color.DIM, "?"))
labels_str = f" [{', '.join(runs_on)}]" if runs_on else ""
print(f" {color}{symbol}{Color.RST} #{job_id} {name}{labels_str}")
return 0
else: # list (default)
if owner and repo:
runners = api.get_runners(owner, repo)
elif owner:
runners = api.get_runners(owner)
else:
runners = api.get_runners()
if getattr(args, "json", False):
print(json.dumps({"scope": scope, "runners": runners}, indent=2))
return 0
print(f"{Color.BLD}Runners ({scope}):{Color.RST}\n")
if not runners:
print(f"{Color.DIM}No runners found{Color.RST}")
print(f"{Color.DIM}(May require authentication or admin access){Color.RST}")
return 0
online = 0
offline = 0
for r in runners:
rid = r.get("id", 0)
name = r.get("name", "unnamed")
status = r.get("status", "unknown")
busy = r.get("busy", False)
labels = r.get("labels", [])
if status == "online":
online += 1
color = Color.GRN
symbol = ""
else:
offline += 1
color = Color.RED
symbol = ""
busy_str = f" {Color.YLW}[busy]{Color.RST}" if busy else ""
labels_str = f" ({', '.join(labels[:4])})" if labels else ""
print(f" {color}{symbol}{Color.RST} #{rid} {name}{labels_str}{busy_str}")
print(f"\n{Color.DIM}Total: {online} online, {offline} offline{Color.RST}")
return 0
def cmd_register_token(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Get runner registration token."""
# If --user flag is set, force user-level token
if getattr(args, "user", False):
owner = ""
repo = ""
else:
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
# Determine scope
if owner and repo:
scope = f"repository {owner}/{repo}"
elif owner:
scope = f"organization {owner}"
else:
scope = "user"
print(f"{Color.BLD}Getting registration token ({scope})...{Color.RST}")
token = api.get_registration_token(owner or "", repo or "")
if token:
print(f"\n{Color.GRN}Registration Token:{Color.RST}\n")
print(f" {token}")
print(f"\n{Color.DIM}Usage:{Color.RST}")
print(f" act_runner register --instance {api.config.url} --token {token}")
print()
return 0
else:
print(f"{Color.RED}Failed to get registration token{Color.RST}")
print(f"{Color.DIM}Make sure your token has the required scopes{Color.RST}")
return 1

View File

@@ -0,0 +1,461 @@
"""Run-related commands: list, status, logs, watch, delete."""
import argparse
import json
import sys
import time
from datetime import datetime
from typing import Any
from ..api import GiteaAPI
from ..formatters import (
format_time, format_duration, status_display,
print_compact_run, print_repo_stats, print_single_run,
print_run_summary, output_json,
)
from ..utils import filter_runs, send_notification
# Import style module
try:
from style import Color, Diff
except ImportError:
import os
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
from style import Color, Diff
def cmd_list(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List recent workflow runs."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
# Full discovery mode - list all repos with recent runs
return cmd_discover(api, args)
# Get runs via API
runs = api.get_runs(owner, repo, limit=args.limit)
if not runs:
if not getattr(args, "compact", False):
print(f"{Color.DIM}No workflow runs found{Color.RST}")
return 0
# Apply filters
status_filter = getattr(args, "status_filter", "") or ""
branch_filter = getattr(args, "branch", "") or ""
workflow_filter = getattr(args, "workflow", "") or ""
if status_filter or branch_filter or workflow_filter:
runs = filter_runs(runs, status_filter, branch_filter, workflow_filter)
if not runs:
if not getattr(args, "compact", False):
filters = [f for f in [status_filter, branch_filter, workflow_filter] if f]
print(f"{Color.DIM}No runs matching filters: {', '.join(filters)}{Color.RST}")
return 0
# JSON output
if hasattr(args, "json") and args.json:
return output_json(runs, owner, repo)
# Compact output for scripting
if getattr(args, "compact", False):
for run in runs[:args.limit]:
print_compact_run(run)
return 0
print(f"{Color.BLD}Workflow runs for {owner}/{repo}:{Color.RST}\n")
# Statistics
print_repo_stats(runs)
for run in runs[:args.limit]:
print_single_run(run)
return 0
def cmd_discover(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Discover all repositories and their workflow runs."""
print(f"{Color.BLD}Discovering workflows on {api.config.url}...{Color.RST}\n")
# Get current user
user = api._request("/user")
if not user:
print(f"{Color.YLW}API token required for discovery.{Color.RST}")
print(f"{Color.DIM}Use: gitea-ci config --token YOUR_TOKEN{Color.RST}")
print(f"{Color.DIM}Or use gitea-scrape for public repos without auth{Color.RST}")
return 1
username = user.get("login", "")
print(f"{Color.DIM}User:{Color.RST} {username}\n")
# Get all repos (user's own + orgs)
repos = api.get_repos(limit=100)
# Also get repos from organizations
orgs: list[Any] = list(api._request("/user/orgs") or [])
for org in orgs:
org_repos = api._request(f"/orgs/{org.get('username', '')}/repos?limit=100")
if org_repos:
repos.extend(org_repos)
if not repos:
print(f"{Color.DIM}No repositories found{Color.RST}")
return 0
# Collect all recent runs
all_runs = []
repos_with_actions = 0
for i, r in enumerate(repos):
repo_owner = r.get("owner", {}).get("login", "")
repo_name = r.get("name", "")
full_name = r.get("full_name", f"{repo_owner}/{repo_name}")
# Progress indicator for large repos
if len(repos) > 10 and (i + 1) % 5 == 0:
print(f"{Color.DIM} Scanning {i + 1}/{len(repos)} repositories...{Color.RST}", end="\r", file=sys.stderr)
runs = api.get_runs(repo_owner, repo_name, limit=3)
if runs:
repos_with_actions += 1
for run in runs:
run["_repo"] = full_name
all_runs.append(run)
# Throttle to avoid rate limiting
api.throttle()
if not all_runs:
print(f"{Color.DIM}No workflow runs found across {len(repos)} repositories{Color.RST}")
return 0
# Sort by created_at descending
all_runs.sort(key=lambda x: x.get("created_at", ""), reverse=True)
print_run_summary(all_runs, repos_with_actions)
return 0
def cmd_status(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show detailed status of a workflow run."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required. Use -o OWNER -r REPO or set defaults{Color.RST}")
return 1
# Get run (latest or specific)
if args.run_id:
run = api.get_run(owner, repo, args.run_id)
else:
runs = api.get_runs(owner, repo, limit=1)
run = runs[0] if runs else None
if not run:
print(f"{Color.RED}Run not found{Color.RST}")
return 1
run_id = run.get("id", 0)
# Get jobs
jobs = api.get_jobs(owner, repo, run_id)
# JSON output
if getattr(args, "json", False):
output = {
"repository": f"{owner}/{repo}",
"run": {
"id": run_id,
"status": run.get("status"),
"conclusion": run.get("conclusion"),
"title": run.get("display_title"),
"branch": run.get("head_branch"),
"commit": run.get("head_sha"),
"created_at": run.get("created_at"),
"started_at": run.get("run_started_at"),
"updated_at": run.get("updated_at"),
},
"jobs": [
{
"id": j.get("id"),
"name": j.get("name"),
"status": j.get("status"),
"conclusion": j.get("conclusion"),
"started_at": j.get("started_at"),
"completed_at": j.get("completed_at"),
}
for j in jobs
] if jobs else []
}
print(json.dumps(output, indent=2))
return 0
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", "")
branch = run.get("head_branch", "")
commit = run.get("head_sha", "")[:8]
when = format_time(run.get("created_at", ""))
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
print(f"{Color.BLD}Run #{run_id}{Color.RST} {status}")
print(f" {Color.DIM}Title:{Color.RST} {title}")
print(f" {Color.DIM}Branch:{Color.RST} {branch} ({commit})")
print(f" {Color.DIM}Started:{Color.RST} {when}")
print(f" {Color.DIM}Duration:{Color.RST} {duration}")
print()
if jobs:
print(f"{Color.BLD}Jobs:{Color.RST}\n")
pending_jobs = []
for job in jobs:
job_id = job.get("id", 0)
job_status_raw = job.get("status", "")
job_status = status_display(job_status_raw, job.get("conclusion", ""))
job_name = job.get("name", "unknown")
job_duration = format_duration(job.get("started_at", ""), job.get("completed_at", ""))
runs_on = job.get("labels", [])
print(f" {job_status} {job_name} {Color.DIM}(ID: {job_id}, {job_duration}){Color.RST}")
# Collect pending jobs for diagnostics
if job_status_raw in ("pending", "waiting", "queued"):
pending_jobs.append({
"name": job_name,
"id": job_id,
"labels": runs_on,
"created": job.get("created_at", run.get("created_at", "")),
})
# Show pending diagnostics if there are pending jobs
if pending_jobs and not getattr(args, "json", False):
print(f"\n{Color.BLD}Pending Diagnostics:{Color.RST}\n")
# Get available runners
runners = api.get_runners("", "") # User-level runners first
if not runners:
runners = api.get_runners(owner, repo)
if not runners:
print(f" {Color.RED}{Color.RST} No runners available")
print(f" {Color.DIM}Register a runner: gitea-ci register-token{Color.RST}")
else:
online_runners = [r for r in runners if r.get("status") == "online"]
busy_runners = [r for r in runners if r.get("busy", False)]
runner_labels = set()
online_labels = set()
for r in runners:
labels = r.get("labels", [])
runner_labels.update(labels)
if r.get("status") == "online":
online_labels.update(labels)
print(f" Runners: {len(online_runners)} online, {len(busy_runners)} busy, {len(runners)} total")
# For each pending job, check why it might be stuck
for pj in pending_jobs:
created_str = pj.get("created", "")
if created_str:
try:
created_dt = datetime.fromisoformat(created_str.replace("Z", "+00:00"))
pending_secs = (datetime.now(created_dt.tzinfo) - created_dt).total_seconds()
pending_time = f"{int(pending_secs // 60)}m {int(pending_secs % 60)}s" if pending_secs >= 60 else f"{int(pending_secs)}s"
except Exception:
pending_time = "?"
else:
pending_time = "?"
job_labels = pj.get("labels", [])
if not job_labels:
job_labels = ["linux"] # Common default
print(f"\n {Color.YLW}{Color.RST} {pj['name']} {Color.DIM}(pending {pending_time}){Color.RST}")
if job_labels:
missing = set(job_labels) - runner_labels
offline_labels = (set(job_labels) & runner_labels) - online_labels
if missing:
print(f" {Color.RED}Missing labels:{Color.RST} {', '.join(missing)}")
elif offline_labels:
print(f" {Color.YLW}Runners offline for:{Color.RST} {', '.join(offline_labels)}")
elif busy_runners and len(busy_runners) == len(online_runners):
print(f" {Color.YLW}All runners busy{Color.RST}")
else:
print(f" {Color.DIM}Waiting for runner...{Color.RST}")
return 0
def cmd_logs(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Show logs for a job."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
job_id = args.job_id
# Get run_id from argument or latest
if args.run_id:
run_id = args.run_id
else:
runs = api.get_runs(owner, repo, limit=1)
if not runs:
print(f"{Color.RED}No runs found{Color.RST}")
return 1
run_id = runs[0].get("id")
# If no job_id, get jobs for the run
if not job_id:
jobs = api.get_jobs(owner, repo, run_id)
if not jobs:
print(f"{Color.RED}No jobs found{Color.RST}")
return 1
# Prefer failed job
for job in jobs:
if job.get("conclusion") == "failure":
job_id = job.get("id")
break
if not job_id:
job_id = jobs[0].get("id")
logs = api.get_job_logs(owner, repo, job_id)
if logs:
print(logs)
else:
print(f"{Color.RED}No logs available (may require authentication){Color.RST}")
return 1
return 0
def cmd_watch(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Watch a running workflow."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
interval = args.interval
notify = getattr(args, "notify", False)
timeout = getattr(args, "timeout", 0)
start_time = time.time()
timeout_msg = f", timeout: {timeout}s" if timeout else ""
print(f"{Color.BLD}Watching {owner}/{repo}...{Color.RST} (Ctrl+C to stop{timeout_msg})\n")
try:
while True:
# Check timeout
if timeout and (time.time() - start_time) >= timeout:
print(f"\n{Color.YLW}Timeout reached ({timeout}s){Color.RST}")
return 2
# Clear screen for refresh
if not args.no_clear:
print("\033[2J\033[H", end="")
# Get run status
if run_id:
run = api.get_run(owner, repo, run_id)
runs = [run] if run else []
else:
runs = api.get_runs(owner, repo, limit=5)
if not runs:
print(f"{Color.DIM}No active runs{Color.RST}")
else:
for run in runs:
rid = run.get("id", 0)
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", "")[:40]
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
print(f"{status} {Color.BLD}#{rid}{Color.RST} {title} {Color.DIM}({duration}){Color.RST}")
# Show jobs for this run
jobs = api.get_jobs(owner, repo, rid)
for job in jobs:
job_status = status_display(job.get("status", ""), job.get("conclusion", ""))
job_name = job.get("name", "")
print(f" {job_status} {job_name}")
print()
# Stop watching if run is complete
if run.get("status") == "completed":
conclusion = run.get("conclusion", "")
if conclusion == "success":
print(f"{Diff.ADD} ✓ Run completed successfully {Color.RST}")
if notify:
send_notification(
"CI Passed",
f"{owner}/{repo} #{rid} succeeded",
"normal"
)
elif conclusion == "failure":
print(f"{Diff.DEL} ✗ Run failed {Color.RST}")
if notify:
send_notification(
"CI Failed",
f"{owner}/{repo} #{rid} failed",
"critical"
)
else:
print(f"{Diff.CHG} ⚠ Run {conclusion} {Color.RST}")
if notify:
send_notification(
f"CI {conclusion.title()}",
f"{owner}/{repo} #{rid} {conclusion}",
"normal"
)
if run_id: # Only exit if watching specific run
return 0 if conclusion == "success" else 1
time.sleep(interval)
except KeyboardInterrupt:
print(f"\n{Color.DIM}Stopped watching{Color.RST}")
return 0
def cmd_delete(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Delete a workflow run."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
if not run_id:
print(f"{Color.RED}Run ID required (-R/--run-id){Color.RST}")
return 1
# Confirm unless --force
if not getattr(args, "force", False):
confirm = input(f"Delete run #{run_id}? [y/N]: ").strip().lower()
if confirm != "y":
print(f"{Color.DIM}Cancelled{Color.RST}")
return 0
print(f"{Color.BLD}Deleting run #{run_id}...{Color.RST}")
if api.delete_run(owner, repo, run_id):
print(f"{Color.GRN}✓ Run deleted{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to delete run{Color.RST}")
return 1

View File

@@ -0,0 +1,296 @@
"""Workflow-related commands: trigger, rerun, cancel, validate, workflows."""
import argparse
import json
import re
import sys
from pathlib import Path
from ..api import GiteaAPI
from ..utils import validate_workflow
# Import style module
try:
from style import Color
except ImportError:
import os
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))))
from style import Color
def cmd_trigger(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Trigger a workflow manually."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
workflow = args.workflow
ref = args.ref or "master"
# Parse inputs if provided
inputs = {}
if hasattr(args, "input") and args.input:
for inp in args.input:
if "=" in inp:
key, value = inp.split("=", 1)
inputs[key] = value
# If no workflow specified, list available ones
if not workflow:
workflows = api.get_workflows(owner, repo)
if not workflows:
print(f"{Color.DIM}No workflows found{Color.RST}")
return 1
print(f"{Color.BLD}Available workflows:{Color.RST}\n")
for wf in workflows:
name = wf.get("name", "")
path = wf.get("path", "")
state = wf.get("state", "")
print(f" {Color.BLD}{path}{Color.RST}")
if name:
print(f" Name: {name}")
if state:
print(f" State: {state}")
print()
print(f"{Color.DIM}Use: gitea-ci trigger -w <workflow.yml>{Color.RST}")
return 0
print(f"{Color.BLD}Triggering workflow...{Color.RST}")
print(f" Workflow: {workflow}")
print(f" Branch: {ref}")
if inputs:
print(f" Inputs: {inputs}")
if api.trigger_workflow(owner, repo, workflow, ref, inputs if inputs else None):
print(f"\n{Color.GRN}✓ Workflow triggered successfully{Color.RST}")
print(f"{Color.DIM}Use 'gitea-ci watch {owner}/{repo}' to monitor{Color.RST}")
return 0
else:
print(f"\n{Color.RED}✗ Failed to trigger workflow{Color.RST}")
return 1
def cmd_rerun(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Re-run a workflow or job."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
# Re-run specific job
if args.job_id:
print(f"{Color.BLD}Re-running job #{args.job_id}...{Color.RST}")
if api.rerun_job(owner, repo, args.job_id):
print(f"{Color.GRN}✓ Job re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run job{Color.RST}")
return 1
# Get run ID
run_id = args.run_id
if not run_id:
# Get latest run
runs = api.get_runs(owner, repo, limit=1)
if not runs:
print(f"{Color.RED}No runs found{Color.RST}")
return 1
run_id = runs[0].get("id")
# Re-run failed jobs only or all jobs
if args.failed_only:
print(f"{Color.BLD}Re-running failed jobs in run #{run_id}...{Color.RST}")
if api.rerun_failed_jobs(owner, repo, run_id):
print(f"{Color.GRN}✓ Failed jobs re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run jobs{Color.RST}")
return 1
else:
print(f"{Color.BLD}Re-running all jobs in run #{run_id}...{Color.RST}")
if api.rerun_workflow(owner, repo, run_id):
print(f"{Color.GRN}✓ Workflow re-run started{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to re-run workflow{Color.RST}")
return 1
def cmd_cancel(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Cancel a running workflow."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
run_id = args.run_id
if not run_id:
# Get latest running run
runs = api.get_runs(owner, repo, limit=10)
running = [r for r in runs if r.get("status") in ("running", "waiting", "queued")]
if not running:
print(f"{Color.DIM}No running workflows to cancel{Color.RST}")
return 0
run_id = running[0].get("id")
print(f"{Color.BLD}Cancelling run #{run_id}...{Color.RST}")
if api.cancel_run(owner, repo, run_id):
print(f"{Color.GRN}✓ Workflow cancelled{Color.RST}")
return 0
else:
print(f"{Color.RED}✗ Failed to cancel workflow{Color.RST}")
return 1
def cmd_workflows(api: GiteaAPI, args: argparse.Namespace) -> int:
"""List workflows in a repository with details."""
owner = args.owner or api.config.default_owner
repo = args.repo or api.config.default_repo
if not owner or not repo:
print(f"{Color.RED}Repository required{Color.RST}")
return 1
workflows = api.get_workflows(owner, repo)
if getattr(args, "json", False):
print(json.dumps({"repository": f"{owner}/{repo}", "workflows": workflows}, indent=2))
return 0
print(f"{Color.BLD}Workflows for {owner}/{repo}:{Color.RST}\n")
if not workflows:
print(f"{Color.DIM}No workflows found{Color.RST}")
return 0
for wf in workflows:
name = wf.get("name", "unnamed")
path = wf.get("path", "")
state = wf.get("state", "unknown")
filename = path.split("/")[-1] if path else ""
if state == "active":
color = Color.GRN
symbol = ""
elif state == "disabled":
color = Color.YLW
symbol = ""
else:
color = Color.DIM
symbol = "?"
print(f" {color}{symbol}{Color.RST} {filename}")
print(f" {Color.DIM}Name:{Color.RST} {name}")
print(f" {Color.DIM}Path:{Color.RST} {path}")
# Get workflow triggers if verbose
if getattr(args, "verbose", False):
content = api.get_workflow_contents(owner, repo, filename)
if content:
# Parse YAML for triggers
triggers = []
in_on = False
for line in content.split("\n"):
stripped = line.strip()
if stripped.startswith("on:"):
in_on = True
# Single-line on: push
if ":" in stripped[3:]:
triggers.append(stripped[3:].strip())
elif in_on:
if stripped and not stripped.startswith("#"):
if line[0] not in (" ", "\t"):
break # End of on: block
if ":" in stripped or stripped.endswith(":"):
triggers.append(stripped.rstrip(":"))
if triggers:
print(f" {Color.DIM}Triggers:{Color.RST} {', '.join(triggers[:5])}")
print()
return 0
def cmd_validate(api: GiteaAPI, args: argparse.Namespace) -> int:
"""Validate local workflow files."""
path_arg = getattr(args, "path", None)
check_runners = getattr(args, "check_runners", False)
strict = getattr(args, "strict", False)
# Find workflow files
if path_arg:
path = Path(path_arg)
if path.is_file():
workflows = [path]
elif path.is_dir():
workflows = list(path.glob("*.yml")) + list(path.glob("*.yaml"))
else:
print(f"{Color.RED}Path not found: {path_arg}{Color.RST}")
return 1
else:
# Look for workflow directories
for workflow_dir in [".gitea/workflows", ".github/workflows"]:
p = Path(workflow_dir)
if p.exists():
workflows = list(p.glob("*.yml")) + list(p.glob("*.yaml"))
break
else:
print(f"{Color.RED}No workflow directory found{Color.RST}")
print(f"{Color.DIM}Expected: .gitea/workflows/ or .github/workflows/{Color.RST}")
return 1
if not workflows:
print(f"{Color.YLW}No workflow files found{Color.RST}")
return 0
# Get available runner labels if checking runners
available_labels: set = set()
if check_runners:
owner = getattr(args, "owner", None) or api.config.default_owner
repo = getattr(args, "repo", None) or api.config.default_repo
runners = api.get_runners(owner or "", repo or "")
for r in runners:
for label in r.get("labels", []):
available_labels.add(label)
total_errors = 0
total_warnings = 0
for wf_path in sorted(workflows):
content = wf_path.read_text()
errors, warnings = validate_workflow(content, wf_path.name, available_labels)
if errors or warnings:
print(f"{Color.BLD}{wf_path}{Color.RST}")
for level, line, msg in errors:
print(f" {Color.RED}error:{Color.RST} {msg}" + (f" (line {line})" if line else ""))
total_errors += 1
for level, line, msg in warnings:
print(f" {Color.YLW}warning:{Color.RST} {msg}" + (f" (line {line})" if line else ""))
total_warnings += 1
print()
else:
print(f"{Color.GRN}{Color.RST} {wf_path}")
# Summary
print()
if total_errors > 0:
print(f"{Color.RED}{total_errors} error(s){Color.RST}, {total_warnings} warning(s)")
return 1
elif total_warnings > 0 and strict:
print(f"{Color.YLW}{total_warnings} warning(s) (strict mode){Color.RST}")
return 1
elif total_warnings > 0:
print(f"{Color.GRN}OK{Color.RST} with {total_warnings} warning(s)")
return 0
else:
print(f"{Color.GRN}All {len(workflows)} workflow(s) valid{Color.RST}")
return 0

67
src/gitea_ci/config.py Normal file
View File

@@ -0,0 +1,67 @@
"""Configuration management for gitea-ci."""
import json
import os
from dataclasses import dataclass
from pathlib import Path
# Configuration paths
CONFIG_DIR = Path(os.environ.get("XDG_CONFIG_HOME", Path.home() / ".config")) / "gitea-ci"
CONFIG_FILE = CONFIG_DIR / "config.json"
DEFAULT_INSTANCE = os.environ.get("GITEA_URL", "https://git.mymx.me")
# Status colors and symbols mapping
STATUS_STYLE = {
"success": ("GRN", "OK"),
"failure": ("RED", "FAIL"),
"cancelled": ("YLW", "CANCEL"),
"running": ("BLU", "ACTIVE"),
"waiting": ("DIM", "INACTIVE"),
"queued": ("DIM", "QUEUED"),
"pending": ("DIM", "INACTIVE"),
"skipped": ("DIM", "CANCEL"),
}
@dataclass
class Config:
"""Gitea CI configuration."""
url: str = DEFAULT_INSTANCE
token: str = ""
default_owner: str = ""
default_repo: str = ""
@classmethod
def load(cls) -> "Config":
"""Load config from file or environment."""
config = cls()
# Environment overrides
config.url = os.environ.get("GITEA_URL", config.url)
config.token = os.environ.get("GITEA_TOKEN", "")
# Load from file
if CONFIG_FILE.exists():
try:
data = json.loads(CONFIG_FILE.read_text())
config.url = data.get("url", config.url)
config.token = data.get("token", config.token)
config.default_owner = data.get("default_owner", "")
config.default_repo = data.get("default_repo", "")
except (json.JSONDecodeError, IOError):
pass
return config
def save(self) -> None:
"""Save config to file."""
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
data = {
"url": self.url,
"token": self.token,
"default_owner": self.default_owner,
"default_repo": self.default_repo,
}
CONFIG_FILE.write_text(json.dumps(data, indent=2) + "\n")
CONFIG_FILE.chmod(0o600)

332
src/gitea_ci/formatters.py Normal file
View File

@@ -0,0 +1,332 @@
"""Output formatting utilities for gitea-ci."""
import json
from datetime import datetime
from typing import Any
# Import style module
try:
from style import Color, Diff
except ImportError:
import os
import sys
sys.path.insert(0, str(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
from style import Color, Diff
# Status colors and symbols mapping
STATUS_STYLE = {
"success": (Color.GRN, ""),
"failure": (Color.RED, ""),
"cancelled": (Color.YLW, ""),
"running": (Color.BLU, ""),
"waiting": (Color.DIM, ""),
"queued": (Color.DIM, ""),
"pending": (Color.DIM, ""),
"skipped": (Color.DIM, ""),
}
def format_time(timestamp: str) -> str:
"""Format ISO timestamp to relative time."""
if not timestamp:
return ""
try:
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
now = datetime.now(dt.tzinfo)
delta = now - dt
if delta.days > 0:
return f"{delta.days}d ago"
elif delta.seconds >= 3600:
return f"{delta.seconds // 3600}h ago"
elif delta.seconds >= 60:
return f"{delta.seconds // 60}m ago"
else:
return "just now"
except (ValueError, TypeError):
return timestamp[:16] if len(timestamp) > 16 else timestamp
def format_duration(start: str, end: str) -> str:
"""Format duration between two timestamps."""
if not start:
return ""
try:
start_dt = datetime.fromisoformat(start.replace("Z", "+00:00"))
if end:
end_dt = datetime.fromisoformat(end.replace("Z", "+00:00"))
else:
end_dt = datetime.now(start_dt.tzinfo)
delta = end_dt - start_dt
total_secs = int(delta.total_seconds())
if total_secs >= 3600:
return f"{total_secs // 3600}h{(total_secs % 3600) // 60:02d}m"
elif total_secs >= 60:
return f"{total_secs // 60}m{total_secs % 60:02d}s"
else:
return f"{total_secs}s"
except (ValueError, TypeError):
return ""
def format_seconds(seconds: int) -> str:
"""Format seconds to human-readable duration."""
if seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
secs = seconds % 60
if minutes < 60:
return f"{minutes}m{secs}s" if secs else f"{minutes}m"
hours = minutes // 60
mins = minutes % 60
return f"{hours}h{mins}m"
def status_display(status: str, conclusion: str = "") -> str:
"""Get colored status display."""
key = conclusion if conclusion else status
color, symbol = STATUS_STYLE.get(key.lower(), (Color.DIM, "?"))
return f"{color}{symbol}{Color.RST}"
def print_compact_run(run: dict, show_repo: bool = False) -> None:
"""Print a single run in compact single-line format."""
run_id = run.get("id", 0)
status = run.get("status", "")
conclusion = run.get("conclusion", "")
# Status symbol
if conclusion == "success":
sym = f"{Color.GRN}{Color.RST}"
elif conclusion == "failure":
sym = f"{Color.RED}{Color.RST}"
elif status in ("running", "waiting"):
sym = f"{Color.BLU}{Color.RST}"
else:
sym = f"{Color.DIM}{Color.RST}"
branch = run.get("head_branch", "")[:20]
title = run.get("display_title", "")[:40]
workflow = run.get("workflow", "")
repo = run.get("_repo", "")
duration = run.get("duration", "") or format_duration(
run.get("run_started_at", ""), run.get("updated_at", "")
)
# Build compact line
parts = [sym, f"#{run_id:>5}"]
if show_repo and repo:
parts.append(f"{Color.BLD}{repo:<20}{Color.RST}")
if workflow:
parts.append(f"{Color.DIM}{workflow:<15}{Color.RST}")
parts.append(f"{Color.MAG}{branch:<20}{Color.RST}")
parts.append(title)
if duration:
parts.append(f"{Color.DIM}({duration}){Color.RST}")
print(" ".join(parts))
def print_repo_stats(runs: list) -> None:
"""Print statistics for a single repository."""
total = len(runs)
if total == 0:
return
success = len([r for r in runs if r.get("conclusion") == "success"])
failed = len([r for r in runs if r.get("conclusion") == "failure"])
running = len([r for r in runs if r.get("status") in ("running", "waiting")])
completed = total - running
if completed > 0:
rate = (success / completed) * 100
print(f"{Color.DIM}Stats: {Color.GRN}{success}{Color.RST}{Color.DIM} "
f"{Color.RED}{failed}{Color.RST}{Color.DIM} "
f"{Color.BLU}{running}{Color.RST}{Color.DIM} "
f"({rate:.0f}% success){Color.RST}\n")
def print_single_run(run: dict) -> None:
"""Print a single run with full details."""
run_id = run.get("id", 0)
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", run.get("head_branch", ""))[:50]
branch = run.get("head_branch", "")
workflow = run.get("workflow", "")
when = format_time(run.get("created_at", ""))
duration = run.get("duration", "") or format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
if workflow:
print(f" {status} {Color.BLD}{workflow}{Color.RST}")
else:
print(f" {status} {Color.BLD}#{run_id}{Color.RST}")
print(f" {title}")
info_parts = []
if branch:
info_parts.append(f"{Color.MAG}{branch}{Color.RST}")
if when:
info_parts.append(when)
if duration:
info_parts.append(duration)
if info_parts:
print(f" {Color.DIM}{' · '.join(info_parts)}{Color.RST}")
print()
def print_run_line(run: dict) -> None:
"""Print a single run line with repo info."""
run_id = run.get("id", 0)
repo = run.get("_repo", "")
status = status_display(run.get("status", ""), run.get("conclusion", ""))
title = run.get("display_title", run.get("head_branch", ""))[:40]
branch = run.get("head_branch", "")
workflow = run.get("workflow", "")
# Get time - try different fields
when = format_time(run.get("created_at", ""))
# Get duration - try pre-computed or calculate
duration = run.get("duration", "")
if not duration:
duration = format_duration(run.get("run_started_at", ""), run.get("updated_at", ""))
# Build info line
info_parts = []
if branch:
info_parts.append(f"{Color.MAG}{branch}{Color.RST}")
if when:
info_parts.append(when)
if duration:
info_parts.append(duration)
info_str = " · ".join(info_parts) if info_parts else ""
# Print with workflow name if available
if workflow:
print(f" {status} {Color.BLD}{repo}{Color.RST} {Color.DIM}{workflow}{Color.RST}")
else:
print(f" {status} {Color.BLD}{repo}{Color.RST} #{run_id}")
print(f" {title}")
if info_str:
print(f" {Color.DIM}{info_str}{Color.RST}")
print()
def print_run_summary(all_runs: list, repos_with_actions: int) -> None:
"""Print summary of workflow runs grouped by status."""
# Calculate statistics
total = len(all_runs)
success = len([r for r in all_runs if r.get("conclusion") == "success"])
failed = [r for r in all_runs if r.get("conclusion") == "failure"]
running = [r for r in all_runs if r.get("status") == "running" or r.get("status") == "waiting"]
cancelled = len([r for r in all_runs if r.get("conclusion") == "cancelled"])
# Print statistics header
print(f"{Color.BLD}Statistics:{Color.RST}")
completed = total - len(running)
if completed > 0:
success_rate = (success / completed) * 100
print(f" {Color.GRN}{success}{Color.RST} success "
f"{Color.RED}{len(failed)}{Color.RST} failed "
f"{Color.BLU}{len(running)}{Color.RST} running "
f"{Color.DIM}{cancelled} cancelled{Color.RST}")
print(f" {Color.DIM}Success rate: {success_rate:.0f}% ({success}/{completed} completed){Color.RST}")
else:
print(f" {Color.BLU}{len(running)}{Color.RST} running "
f"{Color.DIM}No completed runs yet{Color.RST}")
print(f" {Color.DIM}Repositories with actions: {repos_with_actions}{Color.RST}")
print()
# Show running first
if running:
print(f"{Color.BLU}{Color.BLD}Running/Waiting ({len(running)}):{Color.RST}\n")
for run in running[:5]:
print_run_line(run)
if len(running) > 5:
print(f" {Color.DIM}... and {len(running) - 5} more{Color.RST}\n")
# Show recent failures
if failed:
print(f"{Diff.DEL}{Color.BLD} Recent Failures ({len(failed)}) {Color.RST}\n")
for run in failed[:5]:
print_run_line(run)
if len(failed) > 5:
print(f" {Color.DIM}... and {len(failed) - 5} more{Color.RST}\n")
# Show recent activity
recent = all_runs[:10]
print(f"{Color.BLD}Recent Activity:{Color.RST}\n")
for run in recent:
print_run_line(run)
def print_rate_bar(label: str, rate: float, width: int = 20, show_count: str = "") -> None:
"""Print a labeled progress bar for rates."""
filled = int(rate / 100 * width)
empty = width - filled
# Use block characters only - no colors
bar = f"{'' * filled}{Color.DIM}{'' * empty}{Color.RST}"
count_str = f" {Color.DIM}{show_count}{Color.RST}" if show_count else ""
print(f" {label:<20} {bar} {rate:5.1f}%{count_str}")
def output_json(runs: list, owner: str, repo: str) -> int:
"""Output runs as JSON."""
runs_list: list[dict[str, Any]] = []
output: dict[str, Any] = {
"repository": f"{owner}/{repo}",
"total": len(runs),
"runs": runs_list
}
for run in runs:
runs_list.append({
"id": run.get("id"),
"status": run.get("status"),
"conclusion": run.get("conclusion"),
"title": run.get("display_title"),
"branch": run.get("head_branch"),
"workflow": run.get("workflow"),
"created_at": run.get("created_at"),
"duration": run.get("duration"),
})
print(json.dumps(output, indent=2))
return 0
def output_stats_json(runs: list, owner: str, repo: str) -> int:
"""Output statistics as JSON."""
total = len(runs)
success = len([r for r in runs if r.get("conclusion") == "success"])
failed = len([r for r in runs if r.get("conclusion") == "failure"])
cancelled = len([r for r in runs if r.get("conclusion") == "cancelled"])
running = len([r for r in runs if r.get("status") in ("running", "waiting")])
completed = total - running
# Per-workflow stats
workflows: dict[str, dict] = {}
for run in runs:
wf = run.get("workflow") or "unknown"
if wf not in workflows:
workflows[wf] = {"total": 0, "success": 0, "failed": 0}
workflows[wf]["total"] += 1
if run.get("conclusion") == "success":
workflows[wf]["success"] += 1
elif run.get("conclusion") == "failure":
workflows[wf]["failed"] += 1
output = {
"repository": f"{owner}/{repo}",
"total_runs": total,
"completed": completed,
"success": success,
"failed": failed,
"cancelled": cancelled,
"running": running,
"success_rate": round(success / completed * 100, 1) if completed > 0 else 0,
"workflows": workflows,
}
print(json.dumps(output, indent=2))
return 0

143
src/gitea_ci/utils.py Normal file
View File

@@ -0,0 +1,143 @@
"""Utility functions for gitea-ci."""
import re
import shutil
import subprocess
from datetime import datetime
def filter_runs(runs: list, status_filter: str = "", branch: str = "", workflow: str = "") -> list:
"""Filter runs by status, branch, and/or workflow."""
filtered = runs
# Status filter
if status_filter == "failed":
filtered = [r for r in filtered if r.get("conclusion") == "failure"]
elif status_filter == "success":
filtered = [r for r in filtered if r.get("conclusion") == "success"]
elif status_filter == "running":
filtered = [r for r in filtered if r.get("status") in ("running", "waiting")]
elif status_filter == "completed":
filtered = [r for r in filtered if r.get("status") == "completed"]
# Branch filter
if branch:
filtered = [r for r in filtered if branch.lower() in r.get("head_branch", "").lower()]
# Workflow filter
if workflow:
workflow_lower = workflow.lower()
filtered = [r for r in filtered if (
workflow_lower in r.get("workflow", "").lower() or
workflow_lower in r.get("path", "").lower() or
workflow_lower in r.get("name", "").lower()
)]
return filtered
def parse_duration_seconds(run: dict) -> int:
"""Parse run duration to seconds."""
# Try direct duration field
duration_str = run.get("duration", "")
if duration_str:
# Parse formats like "1m30s", "45s", "2m"
total = 0
minutes = re.search(r"(\d+)m", duration_str)
seconds = re.search(r"(\d+)s", duration_str)
if minutes:
total += int(minutes.group(1)) * 60
if seconds:
total += int(seconds.group(1))
return total
# Calculate from timestamps
started = run.get("run_started_at", "")
ended = run.get("updated_at", "")
if started and ended and run.get("status") == "completed":
try:
start_dt = datetime.fromisoformat(started.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(ended.replace("Z", "+00:00"))
return int((end_dt - start_dt).total_seconds())
except (ValueError, TypeError):
pass
return 0
def validate_workflow(content: str, filename: str, available_labels: set) -> tuple:
"""Validate workflow content without external YAML parser.
Returns (errors, warnings) where each is a list of (level, line, message).
"""
errors: list[tuple] = []
warnings: list[tuple] = []
lines = content.split("\n")
# Check for required top-level keys
has_name = False
has_on = False
has_jobs = False
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
if re.match(r"^name\s*:", line):
has_name = True
if re.match(r"^on\s*:", line):
has_on = True
if re.match(r"^jobs\s*:", line):
has_jobs = True
if not has_name:
warnings.append(("warning", 0, "Missing 'name:' field (recommended)"))
if not has_on:
errors.append(("error", 0, "Missing 'on:' trigger definition"))
if not has_jobs:
errors.append(("error", 0, "Missing 'jobs:' section"))
# Check runs-on labels
runs_on_matches = re.finditer(r"runs-on\s*:\s*(.+)", content)
for match in runs_on_matches:
label = match.group(1).strip().strip("'\"")
# Find line number
pos = match.start()
line_num = content[:pos].count("\n") + 1
# Check if label is available (if we have runner info)
if available_labels and label not in available_labels:
warnings.append(("warning", line_num, f"runs-on label '{label}' not found in available runners"))
# Check for common issues
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Check for tabs (YAML doesn't like tabs)
if "\t" in line and not stripped.startswith("#"):
errors.append(("error", i, "Tab character found (use spaces for indentation)"))
# Check for trailing colons without values (common mistake)
if re.match(r"^\s+-\s*$", line):
errors.append(("error", i, "Empty list item"))
# Check for uses without version
uses_match = re.match(r".*uses\s*:\s*([^@\s]+)$", stripped)
if uses_match and not stripped.startswith("#"):
action = uses_match.group(1)
if "/" in action and "@" not in action:
warnings.append(("warning", i, f"Action '{action}' should specify version (e.g., @v1)"))
return errors, warnings
def send_notification(title: str, message: str, urgency: str = "normal") -> None:
"""Send desktop notification via notify-send."""
if shutil.which("notify-send"):
try:
subprocess.run(
["notify-send", f"--urgency={urgency}", title, message],
capture_output=True,
timeout=5
)
except (subprocess.SubprocessError, OSError):
pass