diff --git a/src/harbor/__init__.py b/src/harbor/__init__.py new file mode 100644 index 0000000..2587510 --- /dev/null +++ b/src/harbor/__init__.py @@ -0,0 +1,4 @@ +"""Harbor registry control utility.""" + +__version__ = "0.1.0" +__all__ = ["__version__"] diff --git a/src/harbor/__main__.py b/src/harbor/__main__.py new file mode 100644 index 0000000..b6cf81b --- /dev/null +++ b/src/harbor/__main__.py @@ -0,0 +1,8 @@ +"""Entry point for python -m harbor.""" + +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/harbor/cli.py b/src/harbor/cli.py new file mode 100644 index 0000000..34972f4 --- /dev/null +++ b/src/harbor/cli.py @@ -0,0 +1,158 @@ +"""Command-line interface.""" + +import argparse +import sys + +from . import __version__ +from .commands import ( + cmd_artifacts, + cmd_config, + cmd_delete, + cmd_info, + cmd_projects, + cmd_repos, + cmd_sbom, + cmd_scan, + cmd_tags, + cmd_vulns, +) +from .config import load_credentials + + +def create_parser() -> argparse.ArgumentParser: + """Create and configure argument parser.""" + parser = argparse.ArgumentParser( + description="Harbor registry control", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s projects List all projects + %(prog)s repos library List repos in 'library' project + %(prog)s artifacts library flaskpaste List artifacts + %(prog)s info library flaskpaste Show latest artifact details + %(prog)s info library flaskpaste -d latest Show artifact by tag + %(prog)s vulns library flaskpaste -s high Show high severity vulns + %(prog)s vulns library flaskpaste -P jaraco Filter by package name + %(prog)s scan library flaskpaste --wait Scan and wait for completion + %(prog)s tags library flaskpaste List tags for latest artifact + %(prog)s sbom library flaskpaste Show SBOM for artifact + %(prog)s config library --show Show project settings +""" + ) + parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}") + parser.add_argument("--url", help="Harbor URL", default=None) + parser.add_argument("-u", "--user", help="Username", default=None) + parser.add_argument("-p", "--password", help="Password", default=None) + parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)") + + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # projects + subparsers.add_parser("projects", help="List projects") + + # repos + p_repos = subparsers.add_parser("repos", help="List repositories") + p_repos.add_argument("project", help="Project name") + + # artifacts + p_artifacts = subparsers.add_parser("artifacts", help="List artifacts") + p_artifacts.add_argument("project", help="Project name") + p_artifacts.add_argument("repo", help="Repository name") + + # info + p_info = subparsers.add_parser("info", help="Show artifact details") + p_info.add_argument("project", help="Project name") + p_info.add_argument("repo", help="Repository name") + p_info.add_argument("-d", "--digest", help="Digest or tag (default: latest)") + p_info.add_argument("--json", action="store_true", help="Include raw JSON output") + + # vulns + p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities") + p_vulns.add_argument("project", help="Project name") + p_vulns.add_argument("repo", help="Repository name") + p_vulns.add_argument("-d", "--digest", help="Digest or tag (default: latest)") + p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)") + p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)") + p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)") + + # scan + p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan") + p_scan.add_argument("project", help="Project name") + p_scan.add_argument("repo", help="Repository name") + p_scan.add_argument("-d", "--digest", help="Digest or tag (default: latest)") + p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete") + p_scan.add_argument("-t", "--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)") + + # delete + p_delete = subparsers.add_parser("delete", help="Delete artifact or tag") + p_delete.add_argument("project", help="Project name") + p_delete.add_argument("repo", help="Repository name") + p_delete.add_argument("-d", "--digest", help="Digest or tag to delete") + p_delete.add_argument("-T", "--tag", help="Delete specific tag only") + p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation") + + # tags + p_tags = subparsers.add_parser("tags", help="List or manage tags") + p_tags.add_argument("project", help="Project name") + p_tags.add_argument("repo", help="Repository name") + p_tags.add_argument("-d", "--digest", help="Digest or tag (default: latest)") + p_tags.add_argument("-a", "--add", help="Add new tag to artifact") + + # sbom + p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact") + p_sbom.add_argument("project", help="Project name") + p_sbom.add_argument("repo", help="Repository name") + p_sbom.add_argument("-d", "--digest", help="Digest or tag (default: latest)") + p_sbom.add_argument("--json", action="store_true", help="Output raw JSON") + p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list") + p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)") + + # config + p_config = subparsers.add_parser("config", help="Configure project settings") + p_config.add_argument("project", help="Project name") + p_config.add_argument("--show", action="store_true", help="Show current settings") + p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false", + help="Enable/disable auto-scan on push") + p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false", + help="Enable/disable auto-SBOM generation on push") + + return parser + + +def main() -> int: + """Main entry point.""" + parser = create_parser() + args = parser.parse_args() + + if not args.command: + parser.print_help() + return 1 + + # Load credentials + creds_user, creds_pass, creds_url = load_credentials() + user = args.user or creds_user + password = args.password or creds_pass + url = args.url or creds_url + + if not user or not password: + print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.") + return 1 + + commands = { + "projects": cmd_projects, + "repos": cmd_repos, + "artifacts": cmd_artifacts, + "info": cmd_info, + "vulns": cmd_vulns, + "scan": cmd_scan, + "delete": cmd_delete, + "tags": cmd_tags, + "sbom": cmd_sbom, + "config": cmd_config, + } + + return commands[args.command](args, user, password, url) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/harbor/client.py b/src/harbor/client.py new file mode 100644 index 0000000..0bef125 --- /dev/null +++ b/src/harbor/client.py @@ -0,0 +1,132 @@ +"""Harbor API client.""" + +import base64 +import json +import ssl +import urllib.error +import urllib.request +from typing import Any + + +def build_url(base: str, *parts: str, **params: str) -> str: + """Build API URL from parts and optional query parameters.""" + url = f"{base}/api/v2.0/{'/'.join(parts)}" + if params: + query = "&".join(f"{k}={v}" for k, v in params.items()) + url = f"{url}?{query}" + return url + + +def api_request( + url: str, + user: str, + password: str, + method: str = "GET", + data: dict[str, Any] | None = None, + verify_ssl: bool = False, +) -> dict[str, Any]: + """Make authenticated API request to Harbor. + + Args: + url: Full API URL + user: Username for basic auth + password: Password for basic auth + method: HTTP method + data: Optional request body (JSON) + verify_ssl: Whether to verify SSL certificates + + Returns: + Response data as dict, or error dict with 'error' key + """ + ctx = ssl.create_default_context() + if not verify_ssl: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + req = urllib.request.Request(url, method=method) + + # Basic auth + credentials = base64.b64encode(f"{user}:{password}".encode()).decode() + req.add_header("Authorization", f"Basic {credentials}") + + if data: + req.add_header("Content-Type", "application/json") + req.data = json.dumps(data).encode() + + try: + with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: + # Handle responses with no body + if resp.status in (200, 201, 202, 204): + body = resp.read().decode().strip() + if not body: + return {"status": resp.status} + return json.loads(body) + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body = e.read().decode() + try: + err_json = json.loads(body) + return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)} + except json.JSONDecodeError: + return {"error": e.code, "message": body} + except Exception as e: + return {"error": str(e)} + + +def resolve_digest( + project: str, + repo: str, + digest_or_tag: str | None, + user: str, + password: str, + url: str, +) -> str | None: + """Resolve partial digest or tag to full digest. + + Args: + project: Project name + repo: Repository name + digest_or_tag: Digest prefix, tag name, or None for latest + user: API username + password: API password + url: Base Harbor URL + + Returns: + Full digest string or None if not found + """ + if not digest_or_tag: + # Get latest artifact + artifacts = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1", + user, password + ) + if "error" in artifacts or not artifacts: + return None + return artifacts[0].get("digest") + + # If it looks like a full digest, use it directly + if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71: + return digest_or_tag + + # Try as tag first + artifact = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}", + user, password + ) + if "error" not in artifact: + return artifact.get("digest") + + # Try partial digest match + artifacts = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts", + user, password + ) + if "error" in artifacts: + return None + + for a in artifacts: + if a.get("digest", "").startswith(digest_or_tag) or \ + a.get("digest", "").startswith(f"sha256:{digest_or_tag}"): + return a.get("digest") + + return digest_or_tag # Return as-is, let API handle error diff --git a/src/harbor/commands.py b/src/harbor/commands.py new file mode 100644 index 0000000..4f1d3c0 --- /dev/null +++ b/src/harbor/commands.py @@ -0,0 +1,439 @@ +"""CLI command handlers.""" + +import json +import time +from argparse import Namespace + +from .client import api_request, resolve_digest + + +def cmd_projects(args: Namespace, user: str, password: str, url: str) -> int: + """List projects.""" + data = api_request(f"{url}/api/v2.0/projects", user, password) + if "error" in data: + print(f"Error: {data}") + return 1 + + print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}") + print("-" * 60) + for p in data: + meta = p.get("metadata", {}) + print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} " + f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}") + return 0 + + +def cmd_repos(args: Namespace, user: str, password: str, url: str) -> int: + """List repositories in a project.""" + project = args.project + data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password) + if "error" in data: + print(f"Error: {data}") + return 1 + + print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}") + print("-" * 65) + for r in data: + name = r.get("name", "").replace(f"{project}/", "") + print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}") + return 0 + + +def cmd_artifacts(args: Namespace, user: str, password: str, url: str) -> int: + """List artifacts in a repository.""" + project = args.project + repo = args.repo + data = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true", + user, password + ) + if "error" in data: + print(f"Error: {data}") + return 1 + + print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}") + print("-" * 85) + for a in data: + digest = a.get("digest", "")[:19] + tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24] + size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB" + + scan_status = "N/A" + severity = "N/A" + scan = a.get("scan_overview", {}) + for _, result in scan.items(): + scan_status = result.get("scan_status", "N/A") + severity = result.get("severity", "N/A") + + print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}") + return 0 + + +def cmd_info(args: Namespace, user: str, password: str, url: str) -> int: + """Show detailed artifact information.""" + project = args.project + repo = args.repo + digest = resolve_digest(project, repo, args.digest, user, password, url) + + if not digest: + print("Error: Could not resolve artifact") + return 1 + + data = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", + user, password + ) + if "error" in data: + print(f"Error: {data}") + return 1 + + # Basic info + print(f"Repository: {project}/{repo}") + print(f"Digest: {data.get('digest', 'N/A')}") + print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB") + print(f"Push Time: {data.get('push_time', 'N/A')}") + print(f"Pull Time: {data.get('pull_time', 'N/A')}") + + # Tags + tags = data.get("tags") or [] + if tags: + print(f"Tags: {', '.join(t.get('name', '') for t in tags)}") + else: + print("Tags: (none)") + + # Extra attributes + extra = data.get("extra_attrs", {}) + if extra: + print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}") + print(f"Created: {extra.get('created', 'N/A')}") + + # Scan overview + scan = data.get("scan_overview", {}) + for report_type, result in scan.items(): + print(f"\nScan Status: {result.get('scan_status', 'N/A')}") + print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}") + print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}") + print(f"Severity: {result.get('severity', 'N/A')}") + + summary = result.get("summary", {}) + if summary: + counts = summary.get("summary", {}) + print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") + if counts: + parts = [] + for sev in ["Critical", "High", "Medium", "Low"]: + if sev in counts: + parts.append(f"{sev}: {counts[sev]}") + if parts: + print(f" {', '.join(parts)}") + + # JSON output + if args.json: + print("\n--- Raw JSON ---") + print(json.dumps(data, indent=2)) + + return 0 + + +def cmd_vulns(args: Namespace, user: str, password: str, url: str) -> int: + """Show vulnerabilities for an artifact.""" + project = args.project + repo = args.repo + + digest = resolve_digest(project, repo, args.digest, user, password, url) + if not digest: + print("Error: Could not resolve artifact") + return 1 + + data = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities", + user, password + ) + if "error" in data: + print(f"Error: {data}") + return 1 + + severity_filter = args.severity.upper() if args.severity else None + package_filter = args.package.lower() if args.package else None + + for report_type, report in data.items(): + vulns = report.get("vulnerabilities", []) + + # Apply filters + if severity_filter: + vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter] + if package_filter: + vulns = [v for v in vulns if package_filter in v.get("package", "").lower()] + + # Sort by severity + severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4} + vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5)) + + # Summary + summary = {} + for v in report.get("vulnerabilities", []): + sev = v.get("severity", "Unknown") + summary[sev] = summary.get(sev, 0) + 1 + + print("Summary: ", end="") + for sev in ["Critical", "High", "Medium", "Low"]: + if sev in summary: + print(f"{sev}: {summary[sev]} ", end="") + print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n") + + if not vulns: + print("No vulnerabilities matching filter.") + return 0 + + print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}") + print("-" * 95) + + for v in vulns[:args.limit]: + cve = v.get("id", "N/A")[:24] + sev = v.get("severity", "N/A") + pkg = v.get("package", "N/A")[:24] + ver = v.get("version", "N/A")[:14] + fix = v.get("fix_version", "N/A")[:14] or "N/A" + print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}") + + if len(vulns) > args.limit: + print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)") + + return 0 + + +def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int: + """Trigger vulnerability scan.""" + project = args.project + repo = args.repo + + digest = resolve_digest(project, repo, args.digest, user, password, url) + if not digest: + print("Error: Could not resolve artifact") + return 1 + + result = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan", + user, password, method="POST" + ) + + if result.get("status") == 202: + print(f"Scan triggered for {project}/{repo}@{digest[:19]}") + + if args.wait: + print("Waiting for scan to complete...", end="", flush=True) + max_wait = args.timeout + waited = 0 + interval = 3 + + while waited < max_wait: + time.sleep(interval) + waited += interval + + artifact = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", + user, password + ) + if "error" in artifact: + print(f"\nError checking status: {artifact}") + return 1 + + scan = artifact.get("scan_overview", {}) + for _, status in scan.items(): + scan_status = status.get("scan_status", "") + if scan_status == "Success": + print(f"\nScan complete: {status.get('severity', 'N/A')}") + summary = status.get("summary", {}) + if summary: + print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") + return 0 + elif scan_status in ("Error", "Failed"): + print(f"\nScan failed: {scan_status}") + return 1 + + print(".", end="", flush=True) + + print(f"\nTimeout after {max_wait}s - scan may still be running") + return 1 + + return 0 + else: + print(f"Error: {result}") + return 1 + + +def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int: + """Delete artifact or tag.""" + project = args.project + repo = args.repo + + if args.tag: + # Delete specific tag + digest = resolve_digest(project, repo, args.tag, user, password, url) + if not digest: + print(f"Error: Could not find tag '{args.tag}'") + return 1 + + result = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}", + user, password, method="DELETE" + ) + if result.get("status") == 200 or not result.get("error"): + print(f"Deleted tag: {args.tag}") + return 0 + else: + print(f"Error: {result}") + return 1 + else: + # Delete artifact by digest + digest = resolve_digest(project, repo, args.digest, user, password, url) + if not digest: + print("Error: Could not resolve artifact") + return 1 + + if not args.force: + print(f"Will delete: {project}/{repo}@{digest[:19]}") + confirm = input("Confirm deletion? [y/N]: ") + if confirm.lower() != "y": + print("Aborted") + return 1 + + result = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", + user, password, method="DELETE" + ) + if result.get("status") == 200 or not result.get("error"): + print(f"Deleted: {digest[:19]}") + return 0 + else: + print(f"Error: {result}") + return 1 + + +def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int: + """List or manage tags for an artifact.""" + project = args.project + repo = args.repo + + digest = resolve_digest(project, repo, args.digest, user, password, url) + if not digest: + print("Error: Could not resolve artifact") + return 1 + + if args.add: + # Add a new tag + result = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags", + user, password, method="POST", data={"name": args.add} + ) + if result.get("status") in (200, 201) or not result.get("error"): + print(f"Added tag: {args.add}") + return 0 + else: + print(f"Error: {result}") + return 1 + + # List tags + artifact = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", + user, password + ) + if "error" in artifact: + print(f"Error: {artifact}") + return 1 + + tags = artifact.get("tags") or [] + if not tags: + print("No tags") + return 0 + + print(f"{'Tag':<30} {'Push Time':<25} {'Immutable':<10}") + print("-" * 70) + for t in tags: + print(f"{t.get('name', 'N/A'):<30} {t.get('push_time', 'N/A'):<25} {t.get('immutable', False)}") + + return 0 + + +def cmd_config(args: Namespace, user: str, password: str, url: str) -> int: + """Configure project settings.""" + project = args.project + + if args.show: + data = api_request(f"{url}/api/v2.0/projects/{project}", user, password) + if "error" in data: + print(f"Error: {data}") + return 1 + + meta = data.get("metadata", {}) + print(f"Project: {project}") + print(f" auto_scan: {meta.get('auto_scan', 'false')}") + print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}") + print(f" public: {meta.get('public', 'false')}") + print(f" prevent_vul: {meta.get('prevent_vul', 'false')}") + print(f" severity: {meta.get('severity', 'N/A')}") + return 0 + + # Update settings + update = {"metadata": {}} + if args.auto_scan is not None: + update["metadata"]["auto_scan"] = str(args.auto_scan).lower() + if args.auto_sbom is not None: + update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower() + + if not update["metadata"]: + print("No settings to update. Use --auto-scan or --auto-sbom") + return 1 + + result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update) + + if result.get("status") == 200 or not result.get("error"): + print(f"Updated {project}: {update['metadata']}") + return 0 + else: + print(f"Error: {result}") + return 1 + + +def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int: + """Get SBOM for an artifact.""" + project = args.project + repo = args.repo + + digest = resolve_digest(project, repo, args.digest, user, password, url) + if not digest: + print("Error: Could not resolve artifact") + return 1 + + data = api_request( + f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom", + user, password + ) + if "error" in data: + print(f"Error: {data}") + return 1 + + if args.json: + print(json.dumps(data, indent=2)) + else: + # Try to parse and summarize + for report_type, sbom in data.items(): + if isinstance(sbom, dict): + components = sbom.get("components", []) + print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}") + print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}") + print(f"Components: {len(components)}") + + if components and not args.summary: + print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}") + print("-" * 75) + for c in components[:args.limit]: + print(f"{c.get('name', 'N/A')[:34]:<35} " + f"{c.get('version', 'N/A')[:19]:<20} " + f"{c.get('type', 'N/A'):<15}") + if len(components) > args.limit: + print(f"\n... and {len(components) - args.limit} more") + else: + print(f"SBOM data: {sbom}") + + return 0 diff --git a/src/harbor/config.py b/src/harbor/config.py new file mode 100644 index 0000000..e9346b9 --- /dev/null +++ b/src/harbor/config.py @@ -0,0 +1,23 @@ +"""Configuration and credentials management.""" + +import json +import os +from pathlib import Path + +DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json") +DEFAULT_URL = "https://harbor.mymx.me" + + +def load_credentials() -> tuple[str | None, str | None, str]: + """Load Harbor credentials from secrets file or environment. + + Returns: + Tuple of (username, password, url) + """ + if DEFAULT_CREDS.exists(): + with open(DEFAULT_CREDS) as f: + data = json.load(f) + # Try automation user first, fall back to legacy structure + user = data.get("automation") or data.get("users", {}).get("ansible", {}) + return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL) + return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL