Files
harbor/harbor-ctl.py
Username 17e2530da9
All checks were successful
CI / Lint & Check (push) Successful in 12s
feat: add --tag alias, improve tags output, add management commands
- add --tag alias for --digest in vulns, scan, info, sbom commands
- tags command now shows digest column for each tag
- add delete-repo, delete-project, purge, clean-all commands
- add config command for project settings
- bump version to 0.3.0
2026-01-23 16:53:26 +01:00

920 lines
32 KiB
Python
Executable File

#!/usr/bin/env python3
"""Harbor registry control utility."""
import argparse
import base64
import json
import os
import ssl
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
VERSION = "0.3.0"
# Default config
DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json")
DEFAULT_URL = "https://harbor.mymx.me"
def load_credentials() -> tuple[str | None, str | None, str]:
"""Load Harbor credentials from secrets file."""
if DEFAULT_CREDS.exists():
with open(DEFAULT_CREDS) as f:
data = json.load(f)
# Try automation user first, fall back to legacy structure
user = data.get("automation") or data.get("users", {}).get("ansible", {})
return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL)
return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL
def build_url(base: str, *parts: str, **params: str) -> str:
"""Build API URL from parts and optional query parameters."""
url = f"{base}/api/v2.0/{'/'.join(parts)}"
if params:
query = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{query}"
return url
def api_request(
url: str,
user: str,
password: str,
method: str = "GET",
data: dict[str, Any] | None = None,
verify_ssl: bool = False,
) -> dict[str, Any]:
"""Make authenticated API request to Harbor."""
ctx = ssl.create_default_context()
if not verify_ssl:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(url, method=method)
# Basic auth
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
req.add_header("Authorization", f"Basic {credentials}")
if data:
req.add_header("Content-Type", "application/json")
req.data = json.dumps(data).encode()
try:
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
# Handle responses with no body
if resp.status in (200, 201, 202, 204):
body = resp.read().decode().strip()
if not body:
return {"status": resp.status}
return json.loads(body)
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
body = e.read().decode()
try:
err_json = json.loads(body)
return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)}
except json.JSONDecodeError:
return {"error": e.code, "message": body}
except Exception as e:
return {"error": str(e)}
def resolve_digest(project, repo, digest_or_tag, user, password, url):
"""Resolve partial digest or tag to full digest."""
if not digest_or_tag:
# Get latest artifact
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1",
user, password
)
if "error" in artifacts or not artifacts:
return None
return artifacts[0].get("digest")
# If it looks like a full digest, use it directly
if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71:
return digest_or_tag
# Try as tag first
artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}",
user, password
)
if "error" not in artifact:
return artifact.get("digest")
# Try partial digest match
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts",
user, password
)
if "error" in artifacts:
return None
for a in artifacts:
if a.get("digest", "").startswith(digest_or_tag) or \
a.get("digest", "").startswith(f"sha256:{digest_or_tag}"):
return a.get("digest")
return digest_or_tag # Return as-is, let API handle error
def cmd_projects(args, user, password, url):
"""List projects."""
data = api_request(f"{url}/api/v2.0/projects", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}")
print("-" * 60)
for p in data:
meta = p.get("metadata", {})
print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} "
f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}")
return 0
def cmd_repos(args, user, password, url):
"""List repositories in a project."""
project = args.project
data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}")
print("-" * 65)
for r in data:
name = r.get("name", "").replace(f"{project}/", "")
print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}")
return 0
def cmd_artifacts(args, user, password, url):
"""List artifacts in a repository."""
project = args.project
repo = args.repo
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}")
print("-" * 85)
for a in data:
digest = a.get("digest", "")[:19]
tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24]
size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB"
scan_status = "N/A"
severity = "N/A"
scan = a.get("scan_overview", {})
for _, result in scan.items():
scan_status = result.get("scan_status", "N/A")
severity = result.get("severity", "N/A")
print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}")
return 0
def cmd_info(args, user, password, url):
"""Show detailed artifact information."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
# Basic info
print(f"Repository: {project}/{repo}")
print(f"Digest: {data.get('digest', 'N/A')}")
print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB")
print(f"Push Time: {data.get('push_time', 'N/A')}")
print(f"Pull Time: {data.get('pull_time', 'N/A')}")
# Tags
tags = data.get("tags") or []
if tags:
print(f"Tags: {', '.join(t.get('name', '') for t in tags)}")
else:
print("Tags: (none)")
# Extra attributes
extra = data.get("extra_attrs", {})
if extra:
print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}")
print(f"Created: {extra.get('created', 'N/A')}")
# Scan overview
scan = data.get("scan_overview", {})
for report_type, result in scan.items():
print(f"\nScan Status: {result.get('scan_status', 'N/A')}")
print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}")
print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}")
print(f"Severity: {result.get('severity', 'N/A')}")
summary = result.get("summary", {})
if summary:
counts = summary.get("summary", {})
print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
if counts:
parts = []
for sev in ["Critical", "High", "Medium", "Low"]:
if sev in counts:
parts.append(f"{sev}: {counts[sev]}")
if parts:
print(f" {', '.join(parts)}")
# JSON output
if args.json:
print("\n--- Raw JSON ---")
print(json.dumps(data, indent=2))
return 0
def cmd_vulns(args, user, password, url):
"""Show vulnerabilities for an artifact."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
severity_filter = args.severity.upper() if args.severity else None
package_filter = args.package.lower() if args.package else None
for report_type, report in data.items():
vulns = report.get("vulnerabilities", [])
# Apply filters
if severity_filter:
vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter]
if package_filter:
vulns = [v for v in vulns if package_filter in v.get("package", "").lower()]
# Sort by severity
severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4}
vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5))
# Summary
summary = {}
for v in report.get("vulnerabilities", []):
sev = v.get("severity", "Unknown")
summary[sev] = summary.get(sev, 0) + 1
print("Summary: ", end="")
for sev in ["Critical", "High", "Medium", "Low"]:
if sev in summary:
print(f"{sev}: {summary[sev]} ", end="")
print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n")
if not vulns:
print("No vulnerabilities matching filter.")
return 0
print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}")
print("-" * 95)
for v in vulns[:args.limit]:
cve = v.get("id", "N/A")[:24]
sev = v.get("severity", "N/A")
pkg = v.get("package", "N/A")[:24]
ver = v.get("version", "N/A")[:14]
fix = v.get("fix_version", "N/A")[:14] or "N/A"
print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}")
if len(vulns) > args.limit:
print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)")
return 0
def cmd_scan(args, user, password, url):
"""Trigger vulnerability scan."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan",
user, password, method="POST"
)
if result.get("status") == 202:
print(f"Scan triggered for {project}/{repo}@{digest[:19]}")
if args.wait:
print("Waiting for scan to complete...", end="", flush=True)
max_wait = args.timeout
waited = 0
interval = 3
while waited < max_wait:
time.sleep(interval)
waited += interval
artifact = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
user, password
)
if "error" in artifact:
print(f"\nError checking status: {artifact}")
return 1
scan = artifact.get("scan_overview", {})
for _, status in scan.items():
scan_status = status.get("scan_status", "")
if scan_status == "Success":
print(f"\nScan complete: {status.get('severity', 'N/A')}")
summary = status.get("summary", {})
if summary:
print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
return 0
elif scan_status in ("Error", "Failed"):
print(f"\nScan failed: {scan_status}")
return 1
print(".", end="", flush=True)
print(f"\nTimeout after {max_wait}s - scan may still be running")
return 1
return 0
else:
print(f"Error: {result}")
return 1
def cmd_delete(args, user, password, url):
"""Delete artifact or tag."""
project = args.project
repo = args.repo
if args.tag:
# Delete specific tag
digest = resolve_digest(project, repo, args.tag, user, password, url)
if not digest:
print(f"Error: Could not find tag '{args.tag}'")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted tag: {args.tag}")
return 0
else:
print(f"Error: {result}")
return 1
else:
# Delete artifact by digest
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
if not args.force:
print(f"Will delete: {project}/{repo}@{digest[:19]}")
confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted: {digest[:19]}")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_delete_repo(args, user, password, url):
"""Delete an entire repository."""
project = args.project
repo = args.repo
# Get artifact count first
data = api_request(f"{url}/api/v2.0/projects/{project}/repositories/{repo}", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
artifact_count = data.get("artifact_count", 0)
if not args.force:
print(f"Will delete repository: {project}/{repo}")
print(f" Artifacts: {artifact_count}")
confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted repository: {project}/{repo} ({artifact_count} artifacts)")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_delete_project(args, user, password, url):
"""Delete an entire project."""
project = args.project
# Get project info first
data = api_request(f"{url}/api/v2.0/projects/{project}", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
repo_count = data.get("repo_count", 0)
if repo_count > 0 and not args.force:
print(f"Project {project} has {repo_count} repositories.")
print("Delete all repositories first, or use --force to delete non-empty project.")
return 1
if not args.force:
print(f"Will delete project: {project}")
confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
print(f"Deleted project: {project}")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_purge(args, user, password, url):
"""Delete all artifacts from a repository."""
project = args.project
repo = args.repo
# Get all artifacts
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page_size=100",
user, password
)
if "error" in artifacts:
print(f"Error: {artifacts}")
return 1
if not artifacts:
print("No artifacts to delete")
return 0
if not args.force:
print(f"Will delete {len(artifacts)} artifacts from {project}/{repo}")
confirm = input("Confirm deletion? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
deleted = 0
errors = 0
for a in artifacts:
digest = a.get("digest")
if not digest:
continue
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
deleted += 1
print(f" Deleted: {digest[:19]}")
else:
errors += 1
print(f" Error deleting {digest[:19]}: {result}")
print(f"\nDeleted {deleted} artifacts, {errors} errors")
return 0 if errors == 0 else 1
def cmd_clean_all(args, user, password, url):
"""Delete all artifacts from all repositories in all projects."""
# Get all projects
projects = api_request(f"{url}/api/v2.0/projects", user, password)
if "error" in projects:
print(f"Error: {projects}")
return 1
# Collect summary
total_repos = 0
total_artifacts = 0
for p in projects:
repos = api_request(f"{url}/api/v2.0/projects/{p['name']}/repositories", user, password)
if "error" not in repos:
total_repos += len(repos)
for r in repos:
total_artifacts += r.get("artifact_count", 0)
if total_artifacts == 0:
print("No artifacts to delete")
return 0
print(f"Found {len(projects)} projects, {total_repos} repositories, {total_artifacts} artifacts")
if not args.force:
confirm = input("Delete ALL artifacts? [y/N]: ")
if confirm.lower() != "y":
print("Aborted")
return 1
deleted = 0
errors = 0
for p in projects:
project_name = p["name"]
repos = api_request(f"{url}/api/v2.0/projects/{project_name}/repositories", user, password)
if "error" in repos:
continue
for r in repos:
repo_name = r.get("name", "").replace(f"{project_name}/", "")
artifacts = api_request(
f"{url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts?page_size=100",
user, password
)
if "error" in artifacts:
continue
for a in artifacts:
digest = a.get("digest")
if not digest:
continue
result = api_request(
f"{url}/api/v2.0/projects/{project_name}/repositories/{repo_name}/artifacts/{digest}",
user, password, method="DELETE"
)
if result.get("status") == 200 or not result.get("error"):
deleted += 1
print(f" Deleted: {project_name}/{repo_name}@{digest[:12]}")
else:
errors += 1
print(f"\nDeleted {deleted} artifacts, {errors} errors")
return 0 if errors == 0 else 1
def cmd_tags(args, user, password, url):
"""List or manage tags for an artifact."""
project = args.project
repo = args.repo
if args.add:
# Add a new tag - need specific artifact
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
result = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags",
user, password, method="POST", data={"name": args.add}
)
if result.get("status") in (200, 201) or not result.get("error"):
print(f"Added tag: {args.add}")
return 0
else:
print(f"Error: {result}")
return 1
# List all tags across all artifacts in repo
artifacts = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts",
user, password
)
if "error" in artifacts:
print(f"Error: {artifacts}")
return 1
# Collect all tags with their digests
all_tags = []
for a in artifacts:
digest = a.get("digest", "")
for t in a.get("tags") or []:
all_tags.append({
"name": t.get("name", "N/A"),
"digest": digest,
"push_time": t.get("push_time", "N/A"),
"immutable": t.get("immutable", False),
})
if not all_tags:
print("No tags")
return 0
# Sort by push time descending
all_tags.sort(key=lambda x: x["push_time"], reverse=True)
print(f"{'Tag':<40} {'Digest':<15} {'Push Time':<25}")
print("-" * 82)
for t in all_tags:
digest_short = t["digest"][7:19] if t["digest"].startswith("sha256:") else t["digest"][:12]
print(f"{t['name']:<40} {digest_short:<15} {t['push_time']:<25}")
return 0
def cmd_config(args, user, password, url):
"""Configure project settings."""
project = args.project
if args.show:
data = api_request(f"{url}/api/v2.0/projects/{project}", user, password)
if "error" in data:
print(f"Error: {data}")
return 1
meta = data.get("metadata", {})
print(f"Project: {project}")
print(f" auto_scan: {meta.get('auto_scan', 'false')}")
print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}")
print(f" public: {meta.get('public', 'false')}")
print(f" prevent_vul: {meta.get('prevent_vul', 'false')}")
print(f" severity: {meta.get('severity', 'N/A')}")
return 0
# Update settings
update = {"metadata": {}}
if args.auto_scan is not None:
update["metadata"]["auto_scan"] = str(args.auto_scan).lower()
if args.auto_sbom is not None:
update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower()
if not update["metadata"]:
print("No settings to update. Use --auto-scan or --auto-sbom")
return 1
result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update)
if result.get("status") == 200 or not result.get("error"):
print(f"Updated {project}: {update['metadata']}")
return 0
else:
print(f"Error: {result}")
return 1
def cmd_sbom(args, user, password, url):
"""Get SBOM for an artifact."""
project = args.project
repo = args.repo
digest = resolve_digest(project, repo, args.digest, user, password, url)
if not digest:
print("Error: Could not resolve artifact")
return 1
data = api_request(
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom",
user, password
)
if "error" in data:
print(f"Error: {data}")
return 1
if args.json:
print(json.dumps(data, indent=2))
else:
# Try to parse and summarize
for report_type, sbom in data.items():
if isinstance(sbom, dict):
components = sbom.get("components", [])
print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}")
print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}")
print(f"Components: {len(components)}")
if components and not args.summary:
print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}")
print("-" * 75)
for c in components[:args.limit]:
print(f"{c.get('name', 'N/A')[:34]:<35} "
f"{c.get('version', 'N/A')[:19]:<20} "
f"{c.get('type', 'N/A'):<15}")
if len(components) > args.limit:
print(f"\n... and {len(components) - args.limit} more")
else:
print(f"SBOM data: {sbom}")
return 0
def main():
parser = argparse.ArgumentParser(
description="Harbor registry control",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s projects List all projects
%(prog)s repos library List repos in 'library' project
%(prog)s artifacts library flaskpaste List artifacts
%(prog)s info library flaskpaste Show latest artifact details
%(prog)s vulns library flaskpaste -s high Show high severity vulns
%(prog)s scan library flaskpaste --wait Scan and wait for completion
%(prog)s config library --show Show project settings
%(prog)s purge library flaskpaste -f Delete all artifacts from repo
%(prog)s delete-repo library flaskpaste -f Delete entire repository
%(prog)s clean-all -f Delete all artifacts everywhere
"""
)
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {VERSION}")
parser.add_argument("--url", help="Harbor URL", default=None)
parser.add_argument("-u", "--user", help="Username", default=None)
parser.add_argument("-p", "--password", help="Password", default=None)
parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# projects
subparsers.add_parser("projects", help="List projects")
# repos
p_repos = subparsers.add_parser("repos", help="List repositories")
p_repos.add_argument("project", help="Project name")
# artifacts
p_artifacts = subparsers.add_parser("artifacts", help="List artifacts")
p_artifacts.add_argument("project", help="Project name")
p_artifacts.add_argument("repo", help="Repository name")
# info
p_info = subparsers.add_parser("info", help="Show artifact details")
p_info.add_argument("project", help="Project name")
p_info.add_argument("repo", help="Repository name")
p_info.add_argument("-d", "--digest", "--tag", dest="digest", help="Digest or tag (default: latest)")
p_info.add_argument("--json", action="store_true", help="Include raw JSON output")
# vulns
p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities")
p_vulns.add_argument("project", help="Project name")
p_vulns.add_argument("repo", help="Repository name")
p_vulns.add_argument("-d", "--digest", "--tag", dest="digest", help="Digest or tag (default: latest)")
p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)")
p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)")
p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)")
# scan
p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan")
p_scan.add_argument("project", help="Project name")
p_scan.add_argument("repo", help="Repository name")
p_scan.add_argument("-d", "--digest", "--tag", dest="digest", help="Digest or tag (default: latest)")
p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete")
p_scan.add_argument("--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)")
# delete
p_delete = subparsers.add_parser("delete", help="Delete artifact or tag")
p_delete.add_argument("project", help="Project name")
p_delete.add_argument("repo", help="Repository name")
p_delete.add_argument("-d", "--digest", help="Digest or tag to delete")
p_delete.add_argument("-T", "--delete-tag", dest="tag", help="Delete specific tag only")
p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# tags
p_tags = subparsers.add_parser("tags", help="List or manage tags")
p_tags.add_argument("project", help="Project name")
p_tags.add_argument("repo", help="Repository name")
p_tags.add_argument("-d", "--digest", "--tag", dest="digest", help="Digest or tag (for --add)")
p_tags.add_argument("-a", "--add", help="Add new tag to artifact")
# sbom
p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact")
p_sbom.add_argument("project", help="Project name")
p_sbom.add_argument("repo", help="Repository name")
p_sbom.add_argument("-d", "--digest", "--tag", dest="digest", help="Digest or tag (default: latest)")
p_sbom.add_argument("--json", action="store_true", help="Output raw JSON")
p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list")
p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)")
# config
p_config = subparsers.add_parser("config", help="Configure project settings")
p_config.add_argument("project", help="Project name")
p_config.add_argument("--show", action="store_true", help="Show current settings")
p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false",
help="Enable/disable auto-scan on push")
p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false",
help="Enable/disable auto-SBOM generation on push")
# delete-repo
p_delete_repo = subparsers.add_parser("delete-repo", help="Delete entire repository")
p_delete_repo.add_argument("project", help="Project name")
p_delete_repo.add_argument("repo", help="Repository name")
p_delete_repo.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# delete-project
p_delete_project = subparsers.add_parser("delete-project", help="Delete entire project")
p_delete_project.add_argument("project", help="Project name")
p_delete_project.add_argument("-f", "--force", action="store_true", help="Skip confirmation and delete non-empty")
# purge
p_purge = subparsers.add_parser("purge", help="Delete all artifacts from a repository")
p_purge.add_argument("project", help="Project name")
p_purge.add_argument("repo", help="Repository name")
p_purge.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
# clean-all
p_clean_all = subparsers.add_parser("clean-all", help="Delete all artifacts from all repositories")
p_clean_all.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
args = parser.parse_args()
if not args.command:
parser.print_help()
return 1
# Load credentials
creds_user, creds_pass, creds_url = load_credentials()
user = args.user or creds_user
password = args.password or creds_pass
url = args.url or creds_url
if not user or not password:
print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.")
return 1
commands = {
"projects": cmd_projects,
"repos": cmd_repos,
"artifacts": cmd_artifacts,
"info": cmd_info,
"vulns": cmd_vulns,
"scan": cmd_scan,
"delete": cmd_delete,
"delete-repo": cmd_delete_repo,
"delete-project": cmd_delete_project,
"purge": cmd_purge,
"clean-all": cmd_clean_all,
"tags": cmd_tags,
"sbom": cmd_sbom,
"config": cmd_config,
}
# Store connection params on args for command access
args.api_user = user
args.api_password = password
args.api_url = url
return commands[args.command](args, user, password, url)
if __name__ == "__main__":
sys.exit(main())