#!/usr/bin/env python3 """Harbor registry control utility.""" import argparse import base64 import json import os import ssl import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any VERSION = "0.1.0" # Default config DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json") DEFAULT_URL = "https://harbor.mymx.me" def load_credentials() -> tuple[str | None, str | None, str]: """Load Harbor credentials from secrets file.""" if DEFAULT_CREDS.exists(): with open(DEFAULT_CREDS) as f: data = json.load(f) # Try automation user first, fall back to legacy structure user = data.get("automation") or data.get("users", {}).get("ansible", {}) return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL) return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL def build_url(base: str, *parts: str, **params: str) -> str: """Build API URL from parts and optional query parameters.""" url = f"{base}/api/v2.0/{'/'.join(parts)}" if params: query = "&".join(f"{k}={v}" for k, v in params.items()) url = f"{url}?{query}" return url def api_request( url: str, user: str, password: str, method: str = "GET", data: dict[str, Any] | None = None, verify_ssl: bool = False, ) -> dict[str, Any]: """Make authenticated API request to Harbor.""" ctx = ssl.create_default_context() if not verify_ssl: ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url, method=method) # Basic auth credentials = base64.b64encode(f"{user}:{password}".encode()).decode() req.add_header("Authorization", f"Basic {credentials}") if data: req.add_header("Content-Type", "application/json") req.data = json.dumps(data).encode() try: with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: # Handle responses with no body if resp.status in (200, 201, 202, 204): body = resp.read().decode().strip() if not body: return {"status": resp.status} return json.loads(body) return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: body = e.read().decode() try: err_json = json.loads(body) return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)} except json.JSONDecodeError: return {"error": e.code, "message": body} except Exception as e: return {"error": str(e)} def resolve_digest(project, repo, digest_or_tag, user, password, url): """Resolve partial digest or tag to full digest.""" if not digest_or_tag: # Get latest artifact artifacts = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1", user, password ) if "error" in artifacts or not artifacts: return None return artifacts[0].get("digest") # If it looks like a full digest, use it directly if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71: return digest_or_tag # Try as tag first artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}", user, password ) if "error" not in artifact: return artifact.get("digest") # Try partial digest match artifacts = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts", user, password ) if "error" in artifacts: return None for a in artifacts: if a.get("digest", "").startswith(digest_or_tag) or \ a.get("digest", "").startswith(f"sha256:{digest_or_tag}"): return a.get("digest") return digest_or_tag # Return as-is, let API handle error def cmd_projects(args, user, password, url): """List projects.""" data = api_request(f"{url}/api/v2.0/projects", user, password) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}") print("-" * 60) for p in data: meta = p.get("metadata", {}) print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} " f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}") return 0 def cmd_repos(args, user, password, url): """List repositories in a project.""" project = args.project data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}") print("-" * 65) for r in data: name = r.get("name", "").replace(f"{project}/", "") print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}") return 0 def cmd_artifacts(args, user, password, url): """List artifacts in a repository.""" project = args.project repo = args.repo data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true", user, password ) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}") print("-" * 85) for a in data: digest = a.get("digest", "")[:19] tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24] size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB" scan_status = "N/A" severity = "N/A" scan = a.get("scan_overview", {}) for _, result in scan.items(): scan_status = result.get("scan_status", "N/A") severity = result.get("severity", "N/A") print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}") return 0 def cmd_info(args, user, password, url): """Show detailed artifact information.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", user, password ) if "error" in data: print(f"Error: {data}") return 1 # Basic info print(f"Repository: {project}/{repo}") print(f"Digest: {data.get('digest', 'N/A')}") print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB") print(f"Push Time: {data.get('push_time', 'N/A')}") print(f"Pull Time: {data.get('pull_time', 'N/A')}") # Tags tags = data.get("tags") or [] if tags: print(f"Tags: {', '.join(t.get('name', '') for t in tags)}") else: print("Tags: (none)") # Extra attributes extra = data.get("extra_attrs", {}) if extra: print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}") print(f"Created: {extra.get('created', 'N/A')}") # Scan overview scan = data.get("scan_overview", {}) for report_type, result in scan.items(): print(f"\nScan Status: {result.get('scan_status', 'N/A')}") print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}") print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}") print(f"Severity: {result.get('severity', 'N/A')}") summary = result.get("summary", {}) if summary: counts = summary.get("summary", {}) print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") if counts: parts = [] for sev in ["Critical", "High", "Medium", "Low"]: if sev in counts: parts.append(f"{sev}: {counts[sev]}") if parts: print(f" {', '.join(parts)}") # JSON output if args.json: print("\n--- Raw JSON ---") print(json.dumps(data, indent=2)) return 0 def cmd_vulns(args, user, password, url): """Show vulnerabilities for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities", user, password ) if "error" in data: print(f"Error: {data}") return 1 severity_filter = args.severity.upper() if args.severity else None package_filter = args.package.lower() if args.package else None for report_type, report in data.items(): vulns = report.get("vulnerabilities", []) # Apply filters if severity_filter: vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter] if package_filter: vulns = [v for v in vulns if package_filter in v.get("package", "").lower()] # Sort by severity severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4} vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5)) # Summary summary = {} for v in report.get("vulnerabilities", []): sev = v.get("severity", "Unknown") summary[sev] = summary.get(sev, 0) + 1 print("Summary: ", end="") for sev in ["Critical", "High", "Medium", "Low"]: if sev in summary: print(f"{sev}: {summary[sev]} ", end="") print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n") if not vulns: print("No vulnerabilities matching filter.") return 0 print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}") print("-" * 95) for v in vulns[:args.limit]: cve = v.get("id", "N/A")[:24] sev = v.get("severity", "N/A") pkg = v.get("package", "N/A")[:24] ver = v.get("version", "N/A")[:14] fix = v.get("fix_version", "N/A")[:14] or "N/A" print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}") if len(vulns) > args.limit: print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)") return 0 def cmd_scan(args, user, password, url): """Trigger vulnerability scan.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan", user, password, method="POST" ) if result.get("status") == 202: print(f"Scan triggered for {project}/{repo}@{digest[:19]}") if args.wait: print("Waiting for scan to complete...", end="", flush=True) max_wait = args.timeout waited = 0 interval = 3 while waited < max_wait: time.sleep(interval) waited += interval artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", user, password ) if "error" in artifact: print(f"\nError checking status: {artifact}") return 1 scan = artifact.get("scan_overview", {}) for _, status in scan.items(): scan_status = status.get("scan_status", "") if scan_status == "Success": print(f"\nScan complete: {status.get('severity', 'N/A')}") summary = status.get("summary", {}) if summary: print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") return 0 elif scan_status in ("Error", "Failed"): print(f"\nScan failed: {scan_status}") return 1 print(".", end="", flush=True) print(f"\nTimeout after {max_wait}s - scan may still be running") return 1 return 0 else: print(f"Error: {result}") return 1 def cmd_delete(args, user, password, url): """Delete artifact or tag.""" project = args.project repo = args.repo if args.tag: # Delete specific tag digest = resolve_digest(project, repo, args.tag, user, password, url) if not digest: print(f"Error: Could not find tag '{args.tag}'") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}", user, password, method="DELETE" ) if result.get("status") == 200 or not result.get("error"): print(f"Deleted tag: {args.tag}") return 0 else: print(f"Error: {result}") return 1 else: # Delete artifact by digest digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 if not args.force: print(f"Will delete: {project}/{repo}@{digest[:19]}") confirm = input("Confirm deletion? [y/N]: ") if confirm.lower() != "y": print("Aborted") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", user, password, method="DELETE" ) if result.get("status") == 200 or not result.get("error"): print(f"Deleted: {digest[:19]}") return 0 else: print(f"Error: {result}") return 1 def cmd_tags(args, user, password, url): """List or manage tags for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 if args.add: # Add a new tag result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags", user, password, method="POST", data={"name": args.add} ) if result.get("status") in (200, 201) or not result.get("error"): print(f"Added tag: {args.add}") return 0 else: print(f"Error: {result}") return 1 # List tags artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", user, password ) if "error" in artifact: print(f"Error: {artifact}") return 1 tags = artifact.get("tags") or [] if not tags: print("No tags") return 0 print(f"{'Tag':<30} {'Push Time':<25} {'Immutable':<10}") print("-" * 70) for t in tags: print(f"{t.get('name', 'N/A'):<30} {t.get('push_time', 'N/A'):<25} {t.get('immutable', False)}") return 0 def cmd_config(args, user, password, url): """Configure project settings.""" project = args.project if args.show: data = api_request(f"{url}/api/v2.0/projects/{project}", user, password) if "error" in data: print(f"Error: {data}") return 1 meta = data.get("metadata", {}) print(f"Project: {project}") print(f" auto_scan: {meta.get('auto_scan', 'false')}") print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}") print(f" public: {meta.get('public', 'false')}") print(f" prevent_vul: {meta.get('prevent_vul', 'false')}") print(f" severity: {meta.get('severity', 'N/A')}") return 0 # Update settings update = {"metadata": {}} if args.auto_scan is not None: update["metadata"]["auto_scan"] = str(args.auto_scan).lower() if args.auto_sbom is not None: update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower() if not update["metadata"]: print("No settings to update. Use --auto-scan or --auto-sbom") return 1 result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update) if result.get("status") == 200 or not result.get("error"): print(f"Updated {project}: {update['metadata']}") return 0 else: print(f"Error: {result}") return 1 def cmd_sbom(args, user, password, url): """Get SBOM for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom", user, password ) if "error" in data: print(f"Error: {data}") return 1 if args.json: print(json.dumps(data, indent=2)) else: # Try to parse and summarize for report_type, sbom in data.items(): if isinstance(sbom, dict): components = sbom.get("components", []) print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}") print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}") print(f"Components: {len(components)}") if components and not args.summary: print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}") print("-" * 75) for c in components[:args.limit]: print(f"{c.get('name', 'N/A')[:34]:<35} " f"{c.get('version', 'N/A')[:19]:<20} " f"{c.get('type', 'N/A'):<15}") if len(components) > args.limit: print(f"\n... and {len(components) - args.limit} more") else: print(f"SBOM data: {sbom}") return 0 def main(): parser = argparse.ArgumentParser( description="Harbor registry control", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s projects List all projects %(prog)s repos library List repos in 'library' project %(prog)s artifacts library flaskpaste List artifacts %(prog)s info library flaskpaste Show latest artifact details %(prog)s info library flaskpaste -d latest Show artifact by tag %(prog)s vulns library flaskpaste -s high Show high severity vulns %(prog)s vulns library flaskpaste -P jaraco Filter by package name %(prog)s scan library flaskpaste --wait Scan and wait for completion %(prog)s tags library flaskpaste List tags for latest artifact %(prog)s sbom library flaskpaste Show SBOM for artifact %(prog)s config library --show Show project settings """ ) parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {VERSION}") parser.add_argument("--url", help="Harbor URL", default=None) parser.add_argument("-u", "--user", help="Username", default=None) parser.add_argument("-p", "--password", help="Password", default=None) parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)") subparsers = parser.add_subparsers(dest="command", help="Commands") # projects subparsers.add_parser("projects", help="List projects") # repos p_repos = subparsers.add_parser("repos", help="List repositories") p_repos.add_argument("project", help="Project name") # artifacts p_artifacts = subparsers.add_parser("artifacts", help="List artifacts") p_artifacts.add_argument("project", help="Project name") p_artifacts.add_argument("repo", help="Repository name") # info p_info = subparsers.add_parser("info", help="Show artifact details") p_info.add_argument("project", help="Project name") p_info.add_argument("repo", help="Repository name") p_info.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_info.add_argument("--json", action="store_true", help="Include raw JSON output") # vulns p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities") p_vulns.add_argument("project", help="Project name") p_vulns.add_argument("repo", help="Repository name") p_vulns.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)") p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)") p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)") # scan p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan") p_scan.add_argument("project", help="Project name") p_scan.add_argument("repo", help="Repository name") p_scan.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete") p_scan.add_argument("-t", "--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)") # delete p_delete = subparsers.add_parser("delete", help="Delete artifact or tag") p_delete.add_argument("project", help="Project name") p_delete.add_argument("repo", help="Repository name") p_delete.add_argument("-d", "--digest", help="Digest or tag to delete") p_delete.add_argument("-T", "--tag", help="Delete specific tag only") p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation") # tags p_tags = subparsers.add_parser("tags", help="List or manage tags") p_tags.add_argument("project", help="Project name") p_tags.add_argument("repo", help="Repository name") p_tags.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_tags.add_argument("-a", "--add", help="Add new tag to artifact") # sbom p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact") p_sbom.add_argument("project", help="Project name") p_sbom.add_argument("repo", help="Repository name") p_sbom.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_sbom.add_argument("--json", action="store_true", help="Output raw JSON") p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list") p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)") # config p_config = subparsers.add_parser("config", help="Configure project settings") p_config.add_argument("project", help="Project name") p_config.add_argument("--show", action="store_true", help="Show current settings") p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false", help="Enable/disable auto-scan on push") p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false", help="Enable/disable auto-SBOM generation on push") args = parser.parse_args() if not args.command: parser.print_help() return 1 # Load credentials creds_user, creds_pass, creds_url = load_credentials() user = args.user or creds_user password = args.password or creds_pass url = args.url or creds_url if not user or not password: print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.") return 1 commands = { "projects": cmd_projects, "repos": cmd_repos, "artifacts": cmd_artifacts, "info": cmd_info, "vulns": cmd_vulns, "scan": cmd_scan, "delete": cmd_delete, "tags": cmd_tags, "sbom": cmd_sbom, "config": cmd_config, } # Store connection params on args for command access args.api_user = user args.api_password = password args.api_url = url return commands[args.command](args, user, password, url) if __name__ == "__main__": sys.exit(main())