#!/usr/bin/env python3 """Harbor registry control utility.""" import argparse import json import os import sys import time import urllib.request import urllib.error import ssl from pathlib import Path # Default config DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json") DEFAULT_URL = "https://harbor.mymx.me" def load_credentials(): """Load Harbor credentials from secrets file.""" if DEFAULT_CREDS.exists(): with open(DEFAULT_CREDS) as f: data = json.load(f) # Try automation user first, fall back to legacy structure user = data.get("automation") or data.get("users", {}).get("ansible", {}) return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL) return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL def api_request(url, user, password, method="GET", data=None): """Make authenticated API request to Harbor.""" ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request(url, method=method) # Basic auth import base64 credentials = base64.b64encode(f"{user}:{password}".encode()).decode() req.add_header("Authorization", f"Basic {credentials}") if data: req.add_header("Content-Type", "application/json") req.data = json.dumps(data).encode() try: with urllib.request.urlopen(req, context=ctx, timeout=30) as resp: # Handle responses with no body if resp.status in (200, 201, 202, 204): body = resp.read().decode().strip() if not body: return {"status": resp.status} return json.loads(body) return json.loads(resp.read().decode()) except urllib.error.HTTPError as e: body = e.read().decode() try: err_json = json.loads(body) return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)} except json.JSONDecodeError: return {"error": e.code, "message": body} except Exception as e: return {"error": str(e)} def resolve_digest(project, repo, digest_or_tag, user, password, url): """Resolve partial digest or tag to full digest.""" if not digest_or_tag: # Get latest artifact artifacts = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1", user, password ) if "error" in artifacts or not artifacts: return None return artifacts[0].get("digest") # If it looks like a full digest, use it directly if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71: return digest_or_tag # Try as tag first artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}", user, password ) if "error" not in artifact: return artifact.get("digest") # Try partial digest match artifacts = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts", user, password ) if "error" in artifacts: return None for a in artifacts: if a.get("digest", "").startswith(digest_or_tag) or \ a.get("digest", "").startswith(f"sha256:{digest_or_tag}"): return a.get("digest") return digest_or_tag # Return as-is, let API handle error def cmd_projects(args, user, password, url): """List projects.""" data = api_request(f"{url}/api/v2.0/projects", user, password) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}") print("-" * 60) for p in data: meta = p.get("metadata", {}) print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} " f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}") return 0 def cmd_repos(args, user, password, url): """List repositories in a project.""" project = args.project data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}") print("-" * 65) for r in data: name = r.get("name", "").replace(f"{project}/", "") print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}") return 0 def cmd_artifacts(args, user, password, url): """List artifacts in a repository.""" project = args.project repo = args.repo data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true", user, password ) if "error" in data: print(f"Error: {data}") return 1 print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}") print("-" * 85) for a in data: digest = a.get("digest", "")[:19] tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24] size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB" scan_status = "N/A" severity = "N/A" scan = a.get("scan_overview", {}) for _, result in scan.items(): scan_status = result.get("scan_status", "N/A") severity = result.get("severity", "N/A") print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}") return 0 def cmd_info(args, user, password, url): """Show detailed artifact information.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", user, password ) if "error" in data: print(f"Error: {data}") return 1 # Basic info print(f"Repository: {project}/{repo}") print(f"Digest: {data.get('digest', 'N/A')}") print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB") print(f"Push Time: {data.get('push_time', 'N/A')}") print(f"Pull Time: {data.get('pull_time', 'N/A')}") # Tags tags = data.get("tags") or [] if tags: print(f"Tags: {', '.join(t.get('name', '') for t in tags)}") else: print("Tags: (none)") # Extra attributes extra = data.get("extra_attrs", {}) if extra: print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}") print(f"Created: {extra.get('created', 'N/A')}") # Scan overview scan = data.get("scan_overview", {}) for report_type, result in scan.items(): print(f"\nScan Status: {result.get('scan_status', 'N/A')}") print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}") print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}") print(f"Severity: {result.get('severity', 'N/A')}") summary = result.get("summary", {}) if summary: counts = summary.get("summary", {}) print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") if counts: parts = [] for sev in ["Critical", "High", "Medium", "Low"]: if sev in counts: parts.append(f"{sev}: {counts[sev]}") if parts: print(f" {', '.join(parts)}") # JSON output if args.json: print("\n--- Raw JSON ---") print(json.dumps(data, indent=2)) return 0 def cmd_vulns(args, user, password, url): """Show vulnerabilities for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities", user, password ) if "error" in data: print(f"Error: {data}") return 1 severity_filter = args.severity.upper() if args.severity else None package_filter = args.package.lower() if args.package else None for report_type, report in data.items(): vulns = report.get("vulnerabilities", []) # Apply filters if severity_filter: vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter] if package_filter: vulns = [v for v in vulns if package_filter in v.get("package", "").lower()] # Sort by severity severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4} vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5)) # Summary summary = {} for v in report.get("vulnerabilities", []): sev = v.get("severity", "Unknown") summary[sev] = summary.get(sev, 0) + 1 print(f"Summary: ", end="") for sev in ["Critical", "High", "Medium", "Low"]: if sev in summary: print(f"{sev}: {summary[sev]} ", end="") print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n") if not vulns: print("No vulnerabilities matching filter.") return 0 print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}") print("-" * 95) for v in vulns[:args.limit]: cve = v.get("id", "N/A")[:24] sev = v.get("severity", "N/A") pkg = v.get("package", "N/A")[:24] ver = v.get("version", "N/A")[:14] fix = v.get("fix_version", "N/A")[:14] or "N/A" print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}") if len(vulns) > args.limit: print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)") return 0 def cmd_scan(args, user, password, url): """Trigger vulnerability scan.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan", user, password, method="POST" ) if result.get("status") == 202: print(f"Scan triggered for {project}/{repo}@{digest[:19]}") if args.wait: print("Waiting for scan to complete...", end="", flush=True) max_wait = args.timeout waited = 0 interval = 3 while waited < max_wait: time.sleep(interval) waited += interval artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true", user, password ) if "error" in artifact: print(f"\nError checking status: {artifact}") return 1 scan = artifact.get("scan_overview", {}) for _, status in scan.items(): scan_status = status.get("scan_status", "") if scan_status == "Success": print(f"\nScan complete: {status.get('severity', 'N/A')}") summary = status.get("summary", {}) if summary: print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}") return 0 elif scan_status in ("Error", "Failed"): print(f"\nScan failed: {scan_status}") return 1 print(".", end="", flush=True) print(f"\nTimeout after {max_wait}s - scan may still be running") return 1 return 0 else: print(f"Error: {result}") return 1 def cmd_delete(args, user, password, url): """Delete artifact or tag.""" project = args.project repo = args.repo if args.tag: # Delete specific tag digest = resolve_digest(project, repo, args.tag, user, password, url) if not digest: print(f"Error: Could not find tag '{args.tag}'") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}", user, password, method="DELETE" ) if result.get("status") == 200 or not result.get("error"): print(f"Deleted tag: {args.tag}") return 0 else: print(f"Error: {result}") return 1 else: # Delete artifact by digest digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 if not args.force: print(f"Will delete: {project}/{repo}@{digest[:19]}") confirm = input("Confirm deletion? [y/N]: ") if confirm.lower() != "y": print("Aborted") return 1 result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", user, password, method="DELETE" ) if result.get("status") == 200 or not result.get("error"): print(f"Deleted: {digest[:19]}") return 0 else: print(f"Error: {result}") return 1 def cmd_tags(args, user, password, url): """List or manage tags for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 if args.add: # Add a new tag result = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags", user, password, method="POST", data={"name": args.add} ) if result.get("status") in (200, 201) or not result.get("error"): print(f"Added tag: {args.add}") return 0 else: print(f"Error: {result}") return 1 # List tags artifact = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}", user, password ) if "error" in artifact: print(f"Error: {artifact}") return 1 tags = artifact.get("tags") or [] if not tags: print("No tags") return 0 print(f"{'Tag':<30} {'Push Time':<25} {'Immutable':<10}") print("-" * 70) for t in tags: print(f"{t.get('name', 'N/A'):<30} {t.get('push_time', 'N/A'):<25} {t.get('immutable', False)}") return 0 def cmd_config(args, user, password, url): """Configure project settings.""" project = args.project if args.show: data = api_request(f"{url}/api/v2.0/projects/{project}", user, password) if "error" in data: print(f"Error: {data}") return 1 meta = data.get("metadata", {}) print(f"Project: {project}") print(f" auto_scan: {meta.get('auto_scan', 'false')}") print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}") print(f" public: {meta.get('public', 'false')}") print(f" prevent_vul: {meta.get('prevent_vul', 'false')}") print(f" severity: {meta.get('severity', 'N/A')}") return 0 # Update settings update = {"metadata": {}} if args.auto_scan is not None: update["metadata"]["auto_scan"] = str(args.auto_scan).lower() if args.auto_sbom is not None: update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower() if not update["metadata"]: print("No settings to update. Use --auto-scan or --auto-sbom") return 1 result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update) if result.get("status") == 200 or not result.get("error"): print(f"Updated {project}: {update['metadata']}") return 0 else: print(f"Error: {result}") return 1 def cmd_sbom(args, user, password, url): """Get SBOM for an artifact.""" project = args.project repo = args.repo digest = resolve_digest(project, repo, args.digest, user, password, url) if not digest: print("Error: Could not resolve artifact") return 1 data = api_request( f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom", user, password ) if "error" in data: print(f"Error: {data}") return 1 if args.json: print(json.dumps(data, indent=2)) else: # Try to parse and summarize for report_type, sbom in data.items(): if isinstance(sbom, dict): components = sbom.get("components", []) print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}") print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}") print(f"Components: {len(components)}") if components and not args.summary: print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}") print("-" * 75) for c in components[:args.limit]: print(f"{c.get('name', 'N/A')[:34]:<35} " f"{c.get('version', 'N/A')[:19]:<20} " f"{c.get('type', 'N/A'):<15}") if len(components) > args.limit: print(f"\n... and {len(components) - args.limit} more") else: print(f"SBOM data: {sbom}") return 0 def main(): parser = argparse.ArgumentParser( description="Harbor registry control", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s projects List all projects %(prog)s repos library List repos in 'library' project %(prog)s artifacts library flaskpaste List artifacts %(prog)s info library flaskpaste Show latest artifact details %(prog)s info library flaskpaste -d latest Show artifact by tag %(prog)s vulns library flaskpaste -s high Show high severity vulns %(prog)s vulns library flaskpaste -P jaraco Filter by package name %(prog)s scan library flaskpaste --wait Scan and wait for completion %(prog)s tags library flaskpaste List tags for latest artifact %(prog)s sbom library flaskpaste Show SBOM for artifact %(prog)s config library --show Show project settings """ ) parser.add_argument("--url", help="Harbor URL", default=None) parser.add_argument("-u", "--user", help="Username", default=None) parser.add_argument("-p", "--password", help="Password", default=None) subparsers = parser.add_subparsers(dest="command", help="Commands") # projects subparsers.add_parser("projects", help="List projects") # repos p_repos = subparsers.add_parser("repos", help="List repositories") p_repos.add_argument("project", help="Project name") # artifacts p_artifacts = subparsers.add_parser("artifacts", help="List artifacts") p_artifacts.add_argument("project", help="Project name") p_artifacts.add_argument("repo", help="Repository name") # info p_info = subparsers.add_parser("info", help="Show artifact details") p_info.add_argument("project", help="Project name") p_info.add_argument("repo", help="Repository name") p_info.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_info.add_argument("--json", action="store_true", help="Include raw JSON output") # vulns p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities") p_vulns.add_argument("project", help="Project name") p_vulns.add_argument("repo", help="Repository name") p_vulns.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)") p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)") p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)") # scan p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan") p_scan.add_argument("project", help="Project name") p_scan.add_argument("repo", help="Repository name") p_scan.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete") p_scan.add_argument("-t", "--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)") # delete p_delete = subparsers.add_parser("delete", help="Delete artifact or tag") p_delete.add_argument("project", help="Project name") p_delete.add_argument("repo", help="Repository name") p_delete.add_argument("-d", "--digest", help="Digest or tag to delete") p_delete.add_argument("-T", "--tag", help="Delete specific tag only") p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation") # tags p_tags = subparsers.add_parser("tags", help="List or manage tags") p_tags.add_argument("project", help="Project name") p_tags.add_argument("repo", help="Repository name") p_tags.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_tags.add_argument("-a", "--add", help="Add new tag to artifact") # sbom p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact") p_sbom.add_argument("project", help="Project name") p_sbom.add_argument("repo", help="Repository name") p_sbom.add_argument("-d", "--digest", help="Digest or tag (default: latest)") p_sbom.add_argument("--json", action="store_true", help="Output raw JSON") p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list") p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)") # config p_config = subparsers.add_parser("config", help="Configure project settings") p_config.add_argument("project", help="Project name") p_config.add_argument("--show", action="store_true", help="Show current settings") p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false", help="Enable/disable auto-scan on push") p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false", help="Enable/disable auto-SBOM generation on push") args = parser.parse_args() if not args.command: parser.print_help() return 1 # Load credentials creds_user, creds_pass, creds_url = load_credentials() user = args.user or creds_user password = args.password or creds_pass url = args.url or creds_url if not user or not password: print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.") return 1 commands = { "projects": cmd_projects, "repos": cmd_repos, "artifacts": cmd_artifacts, "info": cmd_info, "vulns": cmd_vulns, "scan": cmd_scan, "delete": cmd_delete, "tags": cmd_tags, "sbom": cmd_sbom, "config": cmd_config, } return commands[args.command](args, user, password, url) if __name__ == "__main__": sys.exit(main())