Files
harbor/harbor-ctl.py

694 lines
25 KiB
Python
Executable File

#!/usr/bin/env python3
"""Harbor registry control utility."""
import argparse
import base64
import json
import os
import ssl
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
VERSION = "0.1.0"
# Default config
DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json")
DEFAULT_URL = "https://harbor.mymx.me"
def load_credentials() -> tuple[str | None, str | None, str]:
"""Load Harbor credentials from secrets file."""
if DEFAULT_CREDS.exists():
with open(DEFAULT_CREDS) as f:
data = json.load(f)
# Try automation user first, fall back to legacy structure
user = data.get("automation") or data.get("users", {}).get("ansible", {})
return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL)
return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL
def build_url(base: str, *parts: str, **params: str) -> str:
"""Build API URL from parts and optional query parameters."""
url = f"{base}/api/v2.0/{'/'.join(parts)}"
if params:
query = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query}"
return url
def api_request(
url: str,
user: str,
password: str,
method: str = "GET",
data: dict[str, Any] | None = None,
verify_ssl: bool = False,
) -> dict[str, Any]:
"""Make authenticated API request to Harbor."""
ctx = ssl.create_default_context()
if not verify_ssl:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, method=method)
# Basic auth
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
req.add_header("Authorization", f"Basic {credentials}")
if data:
req.add_header("Content-Type", "application/json")
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
# Handle responses with no body
if resp.status in (200, 201, 202, 204):
body = resp.read().decode().strip()
if not body:
return {"status": resp.status}
return json.loads(body)
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()
try:
err_json = json.loads(body)
return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)}
except json.JSONDecodeError:
return {"error": e.code, "message": body}
except Exception as e:
return {"error": str(e)}
def resolve_digest(project, repo, digest_or_tag, user, password, url):
"""Resolve partial digest or tag to full digest."""
if not digest_or_tag:
# Get latest artifact
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1",
user, password
)
if "error" in artifacts or not artifacts:
return None
return artifacts[0].get("digest")
# If it looks like a full digest, use it directly
if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71:
return digest_or_tag
# Try as tag first
artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}",
user, password
)
if "error" not in artifact:
return artifact.get("digest")
# Try partial digest match
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts",
user, password
)
if "error" in artifacts:
return None
for a in artifacts:
if a.get("digest", "").startswith(digest_or_tag) or \
a.get("digest", "").startswith(f"sha256:{digest_or_tag}"):
return a.get("digest")
return digest_or_tag # Return as-is, let API handle error
def cmd_projects(args, user, password, url):
"""List projects."""
data = api_request(f"{url}/api/v2.0/projects", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}")
print("-" * 60)
for p in data:
meta = p.get("metadata", {})
print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} "
f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}")
return 0
def cmd_repos(args, user, password, url):
"""List repositories in a project."""
project = args.project
data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}")
print("-" * 65)
for r in data:
name = r.get("name", "").replace(f"{project}/", "")
print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}")
return 0
def cmd_artifacts(args, user, password, url):
"""List artifacts in a repository."""
project = args.project
repo = args.repo
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}")
print("-" * 85)
for a in data:
digest = a.get("digest", "")[:19]
tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24]
size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB"
scan_status = "N/A"
severity = "N/A"
scan = a.get("scan_overview", {})
for _, result in scan.items():
scan_status = result.get("scan_status", "N/A")
severity = result.get("severity", "N/A")
print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}")
return 0
def cmd_info(args, user, password, url):
"""Show detailed artifact information."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
# Basic info
print(f"Repository: {project}/{repo}")
print(f"Digest: {data.get('digest', 'N/A')}")
print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB")
print(f"Push Time: {data.get('push_time', 'N/A')}")
print(f"Pull Time: {data.get('pull_time', 'N/A')}")
# Tags
tags = data.get("tags") or []
if tags:
print(f"Tags: {', '.join(t.get('name', '') for t in tags)}")
else:
print("Tags: (none)")
# Extra attributes
extra = data.get("extra_attrs", {})
if extra:
print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}")
print(f"Created: {extra.get('created', 'N/A')}")
# Scan overview
scan = data.get("scan_overview", {})
for report_type, result in scan.items():
print(f"\nScan Status: {result.get('scan_status', 'N/A')}")
print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}")
print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}")
print(f"Severity: {result.get('severity', 'N/A')}")
summary = result.get("summary", {})
if summary:
counts = summary.get("summary", {})
print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
if counts:
parts = []
for sev in ["Critical", "High", "Medium", "Low"]:
if sev in counts:
parts.append(f"{sev}: {counts[sev]}")
if parts:
print(f" {', '.join(parts)}")
# JSON output
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps(data, indent=2))
return 0
def cmd_vulns(args, user, password, url):
"""Show vulnerabilities for an artifact."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
severity_filter = args.severity.upper() if args.severity else None
package_filter = args.package.lower() if args.package else None
for report_type, report in data.items():
vulns = report.get("vulnerabilities", [])
# Apply filters
if severity_filter:
vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter]
if package_filter:
vulns = [v for v in vulns if package_filter in v.get("package", "").lower()]
# Sort by severity
severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4}
vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5))
# Summary
summary = {}
for v in report.get("vulnerabilities", []):
sev = v.get("severity", "Unknown")
summary[sev] = summary.get(sev, 0) + 1
print("Summary: ", end="")
for sev in ["Critical", "High", "Medium", "Low"]:
if sev in summary:
print(f"{sev}: {summary[sev]} ", end="")
print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n")
if not vulns:
print("No vulnerabilities matching filter.")
return 0
print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}")
print("-" * 95)
for v in vulns[:args.limit]:
cve = v.get("id", "N/A")[:24]
sev = v.get("severity", "N/A")
pkg = v.get("package", "N/A")[:24]
ver = v.get("version", "N/A")[:14]
fix = v.get("fix_version", "N/A")[:14] or "N/A"
print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}")
if len(vulns) > args.limit:
print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)")
return 0
def cmd_scan(args, user, password, url):
"""Trigger vulnerability scan."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan",
user, password, method="POST"
)
if result.get("status") == 202:
print(f"Scan triggered for {project}/{repo}@{digest[:19]}")
if args.wait:
print("Waiting for scan to complete...", end="", flush=True)
max_wait = args.timeout
waited = 0
interval = 3
while waited < max_wait:
time.sleep(interval)
waited += interval
artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
user, password
)
if "error" in artifact:
print(f"\nError checking status: {artifact}")
return 1
scan = artifact.get("scan_overview", {})
for _, status in scan.items():
scan_status = status.get("scan_status", "")
if scan_status == "Success":
print(f"\nScan complete: {status.get('severity', 'N/A')}")
summary = status.get("summary", {})
if summary:
print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
return 0
elif scan_status in ("Error", "Failed"):
print(f"\nScan failed: {scan_status}")
return 1
print(".", end="", flush=True)
print(f"\nTimeout after {max_wait}s - scan may still be running")
return 1
return 0
else:
print(f"Error: {result}")
return 1
def cmd_delete(args, user, password, url):
"""Delete artifact or tag."""
project = args.project
repo = args.repo
if args.tag:
# Delete specific tag
digest = resolve_digest(project, repo, args.tag, user, password, url)
if not digest:
print(f"Error: Could not find tag '{args.tag}'")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted tag: {args.tag}")
return 0
else:
print(f"Error: {result}")
return 1
else:
# Delete artifact by digest
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
if not args.force:
print(f"Will delete: {project}/{repo}@{digest[:19]}")
confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted: {digest[:19]}")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_tags(args, user, password, url):
"""List or manage tags for an artifact."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
if args.add:
# Add a new tag
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags",
user, password, method="POST", data={"name": args.add}
)
if result.get("status") in (200, 201) or not result.get("error"):
print(f"Added tag: {args.add}")
return 0
else:
print(f"Error: {result}")
return 1
# List tags
artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
user, password
)
if "error" in artifact:
print(f"Error: {artifact}")
return 1
tags = artifact.get("tags") or []
if not tags:
print("No tags")
return 0
print(f"{'Tag':<30} {'Push Time':<25} {'Immutable':<10}")
print("-" * 70)
for t in tags:
print(f"{t.get('name', 'N/A'):<30} {t.get('push_time', 'N/A'):<25} {t.get('immutable', False)}")
return 0
def cmd_config(args, user, password, url):
"""Configure project settings."""
project = args.project
if args.show:
data = api_request(f"{url}/api/v2.0/projects/{project}", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
meta = data.get("metadata", {})
print(f"Project: {project}")
print(f" auto_scan: {meta.get('auto_scan', 'false')}")
print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}")
print(f" public: {meta.get('public', 'false')}")
print(f" prevent_vul: {meta.get('prevent_vul', 'false')}")
print(f" severity: {meta.get('severity', 'N/A')}")
return 0
# Update settings
update = {"metadata": {}}
if args.auto_scan is not None:
update["metadata"]["auto_scan"] = str(args.auto_scan).lower()
if args.auto_sbom is not None:
update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower()
if not update["metadata"]:
print("No settings to update. Use --auto-scan or --auto-sbom")
return 1
result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update)
if result.get("status") == 200 or not result.get("error"):
print(f"Updated {project}: {update['metadata']}")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_sbom(args, user, password, url):
"""Get SBOM for an artifact."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
if args.json:
print(json.dumps(data, indent=2))
else:
# Try to parse and summarize
for report_type, sbom in data.items():
if isinstance(sbom, dict):
components = sbom.get("components", [])
print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}")
print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}")
print(f"Components: {len(components)}")
if components and not args.summary:
print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}")
print("-" * 75)
for c in components[:args.limit]:
print(f"{c.get('name', 'N/A')[:34]:<35} "
f"{c.get('version', 'N/A')[:19]:<20} "
f"{c.get('type', 'N/A'):<15}")
if len(components) > args.limit:
print(f"\n... and {len(components) - args.limit} more")
else:
print(f"SBOM data: {sbom}")
return 0
def main():
parser = argparse.ArgumentParser(
description="Harbor registry control",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s projects List all projects
%(prog)s repos library List repos in 'library' project
%(prog)s artifacts library flaskpaste List artifacts
%(prog)s info library flaskpaste Show latest artifact details
%(prog)s info library flaskpaste -d latest Show artifact by tag
%(prog)s vulns library flaskpaste -s high Show high severity vulns
%(prog)s vulns library flaskpaste -P jaraco Filter by package name
%(prog)s scan library flaskpaste --wait Scan and wait for completion
%(prog)s tags library flaskpaste List tags for latest artifact
%(prog)s sbom library flaskpaste Show SBOM for artifact
%(prog)s config library --show Show project settings
"""
)
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {VERSION}")
parser.add_argument("--url", help="Harbor URL", default=None)
parser.add_argument("-u", "--user", help="Username", default=None)
parser.add_argument("-p", "--password", help="Password", default=None)
parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# projects
subparsers.add_parser("projects", help="List projects")
# repos
p_repos = subparsers.add_parser("repos", help="List repositories")
p_repos.add_argument("project", help="Project name")
# artifacts
p_artifacts = subparsers.add_parser("artifacts", help="List artifacts")
p_artifacts.add_argument("project", help="Project name")
p_artifacts.add_argument("repo", help="Repository name")
# info
p_info = subparsers.add_parser("info", help="Show artifact details")
p_info.add_argument("project", help="Project name")
p_info.add_argument("repo", help="Repository name")
p_info.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
p_info.add_argument("--json", action="store_true", help="Include raw JSON output")
# vulns
p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities")
p_vulns.add_argument("project", help="Project name")
p_vulns.add_argument("repo", help="Repository name")
p_vulns.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)")
p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)")
p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)")
# scan
p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan")
p_scan.add_argument("project", help="Project name")
p_scan.add_argument("repo", help="Repository name")
p_scan.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete")
p_scan.add_argument("-t", "--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)")
# delete
p_delete = subparsers.add_parser("delete", help="Delete artifact or tag")
p_delete.add_argument("project", help="Project name")
p_delete.add_argument("repo", help="Repository name")
p_delete.add_argument("-d", "--digest", help="Digest or tag to delete")
p_delete.add_argument("-T", "--tag", help="Delete specific tag only")
p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# tags
p_tags = subparsers.add_parser("tags", help="List or manage tags")
p_tags.add_argument("project", help="Project name")
p_tags.add_argument("repo", help="Repository name")
p_tags.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
p_tags.add_argument("-a", "--add", help="Add new tag to artifact")
# sbom
p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact")
p_sbom.add_argument("project", help="Project name")
p_sbom.add_argument("repo", help="Repository name")
p_sbom.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
p_sbom.add_argument("--json", action="store_true", help="Output raw JSON")
p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list")
p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)")
# config
p_config = subparsers.add_parser("config", help="Configure project settings")
p_config.add_argument("project", help="Project name")
p_config.add_argument("--show", action="store_true", help="Show current settings")
p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false",
help="Enable/disable auto-scan on push")
p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false",
help="Enable/disable auto-SBOM generation on push")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Load credentials
creds_user, creds_pass, creds_url = load_credentials()
user = args.user or creds_user
password = args.password or creds_pass
url = args.url or creds_url
if not user or not password:
print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.")
return 1
commands = {
"projects": cmd_projects,
"repos": cmd_repos,
"artifacts": cmd_artifacts,
"info": cmd_info,
"vulns": cmd_vulns,
"scan": cmd_scan,
"delete": cmd_delete,
"tags": cmd_tags,
"sbom": cmd_sbom,
"config": cmd_config,
}
# Store connection params on args for command access
args.api_user = user
args.api_password = password
args.api_url = url
return commands[args.command](args, user, password, url)
if __name__ == "__main__":
sys.exit(main())