add modular package structure under src/harbor/
This commit is contained in:
4
src/harbor/__init__.py
Normal file
4
src/harbor/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""Harbor registry control utility."""
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__all__ = ["__version__"]
|
||||
8
src/harbor/__main__.py
Normal file
8
src/harbor/__main__.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Entry point for python -m harbor."""
|
||||
|
||||
import sys
|
||||
|
||||
from .cli import main
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
158
src/harbor/cli.py
Normal file
158
src/harbor/cli.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""Command-line interface."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
from . import __version__
|
||||
from .commands import (
|
||||
cmd_artifacts,
|
||||
cmd_config,
|
||||
cmd_delete,
|
||||
cmd_info,
|
||||
cmd_projects,
|
||||
cmd_repos,
|
||||
cmd_sbom,
|
||||
cmd_scan,
|
||||
cmd_tags,
|
||||
cmd_vulns,
|
||||
)
|
||||
from .config import load_credentials
|
||||
|
||||
|
||||
def create_parser() -> argparse.ArgumentParser:
|
||||
"""Create and configure argument parser."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Harbor registry control",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Examples:
|
||||
%(prog)s projects List all projects
|
||||
%(prog)s repos library List repos in 'library' project
|
||||
%(prog)s artifacts library flaskpaste List artifacts
|
||||
%(prog)s info library flaskpaste Show latest artifact details
|
||||
%(prog)s info library flaskpaste -d latest Show artifact by tag
|
||||
%(prog)s vulns library flaskpaste -s high Show high severity vulns
|
||||
%(prog)s vulns library flaskpaste -P jaraco Filter by package name
|
||||
%(prog)s scan library flaskpaste --wait Scan and wait for completion
|
||||
%(prog)s tags library flaskpaste List tags for latest artifact
|
||||
%(prog)s sbom library flaskpaste Show SBOM for artifact
|
||||
%(prog)s config library --show Show project settings
|
||||
"""
|
||||
)
|
||||
parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {__version__}")
|
||||
parser.add_argument("--url", help="Harbor URL", default=None)
|
||||
parser.add_argument("-u", "--user", help="Username", default=None)
|
||||
parser.add_argument("-p", "--password", help="Password", default=None)
|
||||
parser.add_argument("--verify-ssl", action="store_true", help="Verify SSL certificates (default: skip)")
|
||||
|
||||
subparsers = parser.add_subparsers(dest="command", help="Commands")
|
||||
|
||||
# projects
|
||||
subparsers.add_parser("projects", help="List projects")
|
||||
|
||||
# repos
|
||||
p_repos = subparsers.add_parser("repos", help="List repositories")
|
||||
p_repos.add_argument("project", help="Project name")
|
||||
|
||||
# artifacts
|
||||
p_artifacts = subparsers.add_parser("artifacts", help="List artifacts")
|
||||
p_artifacts.add_argument("project", help="Project name")
|
||||
p_artifacts.add_argument("repo", help="Repository name")
|
||||
|
||||
# info
|
||||
p_info = subparsers.add_parser("info", help="Show artifact details")
|
||||
p_info.add_argument("project", help="Project name")
|
||||
p_info.add_argument("repo", help="Repository name")
|
||||
p_info.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
|
||||
p_info.add_argument("--json", action="store_true", help="Include raw JSON output")
|
||||
|
||||
# vulns
|
||||
p_vulns = subparsers.add_parser("vulns", help="Show vulnerabilities")
|
||||
p_vulns.add_argument("project", help="Project name")
|
||||
p_vulns.add_argument("repo", help="Repository name")
|
||||
p_vulns.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
|
||||
p_vulns.add_argument("-s", "--severity", help="Filter by severity (critical/high/medium/low)")
|
||||
p_vulns.add_argument("-P", "--package", help="Filter by package name (substring match)")
|
||||
p_vulns.add_argument("-l", "--limit", type=int, default=50, help="Max results (default: 50)")
|
||||
|
||||
# scan
|
||||
p_scan = subparsers.add_parser("scan", help="Trigger vulnerability scan")
|
||||
p_scan.add_argument("project", help="Project name")
|
||||
p_scan.add_argument("repo", help="Repository name")
|
||||
p_scan.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
|
||||
p_scan.add_argument("-w", "--wait", action="store_true", help="Wait for scan to complete")
|
||||
p_scan.add_argument("-t", "--timeout", type=int, default=120, help="Wait timeout in seconds (default: 120)")
|
||||
|
||||
# delete
|
||||
p_delete = subparsers.add_parser("delete", help="Delete artifact or tag")
|
||||
p_delete.add_argument("project", help="Project name")
|
||||
p_delete.add_argument("repo", help="Repository name")
|
||||
p_delete.add_argument("-d", "--digest", help="Digest or tag to delete")
|
||||
p_delete.add_argument("-T", "--tag", help="Delete specific tag only")
|
||||
p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
|
||||
|
||||
# tags
|
||||
p_tags = subparsers.add_parser("tags", help="List or manage tags")
|
||||
p_tags.add_argument("project", help="Project name")
|
||||
p_tags.add_argument("repo", help="Repository name")
|
||||
p_tags.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
|
||||
p_tags.add_argument("-a", "--add", help="Add new tag to artifact")
|
||||
|
||||
# sbom
|
||||
p_sbom = subparsers.add_parser("sbom", help="Get SBOM for artifact")
|
||||
p_sbom.add_argument("project", help="Project name")
|
||||
p_sbom.add_argument("repo", help="Repository name")
|
||||
p_sbom.add_argument("-d", "--digest", help="Digest or tag (default: latest)")
|
||||
p_sbom.add_argument("--json", action="store_true", help="Output raw JSON")
|
||||
p_sbom.add_argument("--summary", action="store_true", help="Summary only, no component list")
|
||||
p_sbom.add_argument("-l", "--limit", type=int, default=50, help="Max components to show (default: 50)")
|
||||
|
||||
# config
|
||||
p_config = subparsers.add_parser("config", help="Configure project settings")
|
||||
p_config.add_argument("project", help="Project name")
|
||||
p_config.add_argument("--show", action="store_true", help="Show current settings")
|
||||
p_config.add_argument("--auto-scan", type=lambda x: x.lower() == "true", metavar="true|false",
|
||||
help="Enable/disable auto-scan on push")
|
||||
p_config.add_argument("--auto-sbom", type=lambda x: x.lower() == "true", metavar="true|false",
|
||||
help="Enable/disable auto-SBOM generation on push")
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Main entry point."""
|
||||
parser = create_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
# Load credentials
|
||||
creds_user, creds_pass, creds_url = load_credentials()
|
||||
user = args.user or creds_user
|
||||
password = args.password or creds_pass
|
||||
url = args.url or creds_url
|
||||
|
||||
if not user or not password:
|
||||
print("Error: No credentials. Set HARBOR_USER/HARBOR_PASS or configure secrets file.")
|
||||
return 1
|
||||
|
||||
commands = {
|
||||
"projects": cmd_projects,
|
||||
"repos": cmd_repos,
|
||||
"artifacts": cmd_artifacts,
|
||||
"info": cmd_info,
|
||||
"vulns": cmd_vulns,
|
||||
"scan": cmd_scan,
|
||||
"delete": cmd_delete,
|
||||
"tags": cmd_tags,
|
||||
"sbom": cmd_sbom,
|
||||
"config": cmd_config,
|
||||
}
|
||||
|
||||
return commands[args.command](args, user, password, url)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
132
src/harbor/client.py
Normal file
132
src/harbor/client.py
Normal file
@@ -0,0 +1,132 @@
|
||||
"""Harbor API client."""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import ssl
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import Any
|
||||
|
||||
|
||||
def build_url(base: str, *parts: str, **params: str) -> str:
|
||||
"""Build API URL from parts and optional query parameters."""
|
||||
url = f"{base}/api/v2.0/{'/'.join(parts)}"
|
||||
if params:
|
||||
query = "&".join(f"{k}={v}" for k, v in params.items())
|
||||
url = f"{url}?{query}"
|
||||
return url
|
||||
|
||||
|
||||
def api_request(
|
||||
url: str,
|
||||
user: str,
|
||||
password: str,
|
||||
method: str = "GET",
|
||||
data: dict[str, Any] | None = None,
|
||||
verify_ssl: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Make authenticated API request to Harbor.
|
||||
|
||||
Args:
|
||||
url: Full API URL
|
||||
user: Username for basic auth
|
||||
password: Password for basic auth
|
||||
method: HTTP method
|
||||
data: Optional request body (JSON)
|
||||
verify_ssl: Whether to verify SSL certificates
|
||||
|
||||
Returns:
|
||||
Response data as dict, or error dict with 'error' key
|
||||
"""
|
||||
ctx = ssl.create_default_context()
|
||||
if not verify_ssl:
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
req = urllib.request.Request(url, method=method)
|
||||
|
||||
# Basic auth
|
||||
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
|
||||
req.add_header("Authorization", f"Basic {credentials}")
|
||||
|
||||
if data:
|
||||
req.add_header("Content-Type", "application/json")
|
||||
req.data = json.dumps(data).encode()
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, context=ctx, timeout=30) as resp:
|
||||
# Handle responses with no body
|
||||
if resp.status in (200, 201, 202, 204):
|
||||
body = resp.read().decode().strip()
|
||||
if not body:
|
||||
return {"status": resp.status}
|
||||
return json.loads(body)
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
body = e.read().decode()
|
||||
try:
|
||||
err_json = json.loads(body)
|
||||
return {"error": e.code, "message": err_json.get("errors", [{}])[0].get("message", body)}
|
||||
except json.JSONDecodeError:
|
||||
return {"error": e.code, "message": body}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
|
||||
def resolve_digest(
|
||||
project: str,
|
||||
repo: str,
|
||||
digest_or_tag: str | None,
|
||||
user: str,
|
||||
password: str,
|
||||
url: str,
|
||||
) -> str | None:
|
||||
"""Resolve partial digest or tag to full digest.
|
||||
|
||||
Args:
|
||||
project: Project name
|
||||
repo: Repository name
|
||||
digest_or_tag: Digest prefix, tag name, or None for latest
|
||||
user: API username
|
||||
password: API password
|
||||
url: Base Harbor URL
|
||||
|
||||
Returns:
|
||||
Full digest string or None if not found
|
||||
"""
|
||||
if not digest_or_tag:
|
||||
# Get latest artifact
|
||||
artifacts = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?page=1&page_size=1",
|
||||
user, password
|
||||
)
|
||||
if "error" in artifacts or not artifacts:
|
||||
return None
|
||||
return artifacts[0].get("digest")
|
||||
|
||||
# If it looks like a full digest, use it directly
|
||||
if digest_or_tag.startswith("sha256:") and len(digest_or_tag) == 71:
|
||||
return digest_or_tag
|
||||
|
||||
# Try as tag first
|
||||
artifact = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest_or_tag}",
|
||||
user, password
|
||||
)
|
||||
if "error" not in artifact:
|
||||
return artifact.get("digest")
|
||||
|
||||
# Try partial digest match
|
||||
artifacts = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts",
|
||||
user, password
|
||||
)
|
||||
if "error" in artifacts:
|
||||
return None
|
||||
|
||||
for a in artifacts:
|
||||
if a.get("digest", "").startswith(digest_or_tag) or \
|
||||
a.get("digest", "").startswith(f"sha256:{digest_or_tag}"):
|
||||
return a.get("digest")
|
||||
|
||||
return digest_or_tag # Return as-is, let API handle error
|
||||
439
src/harbor/commands.py
Normal file
439
src/harbor/commands.py
Normal file
@@ -0,0 +1,439 @@
|
||||
"""CLI command handlers."""
|
||||
|
||||
import json
|
||||
import time
|
||||
from argparse import Namespace
|
||||
|
||||
from .client import api_request, resolve_digest
|
||||
|
||||
|
||||
def cmd_projects(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""List projects."""
|
||||
data = api_request(f"{url}/api/v2.0/projects", user, password)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
print(f"{'Project':<20} {'Public':<8} {'Repos':<6} {'Auto-Scan':<10} {'Auto-SBOM':<10}")
|
||||
print("-" * 60)
|
||||
for p in data:
|
||||
meta = p.get("metadata", {})
|
||||
print(f"{p['name']:<20} {meta.get('public', 'false'):<8} {p.get('repo_count', 0):<6} "
|
||||
f"{meta.get('auto_scan', 'false'):<10} {meta.get('auto_sbom_generation', 'false'):<10}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_repos(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""List repositories in a project."""
|
||||
project = args.project
|
||||
data = api_request(f"{url}/api/v2.0/projects/{project}/repositories", user, password)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
print(f"{'Repository':<40} {'Artifacts':<10} {'Pull Count':<12}")
|
||||
print("-" * 65)
|
||||
for r in data:
|
||||
name = r.get("name", "").replace(f"{project}/", "")
|
||||
print(f"{name:<40} {r.get('artifact_count', 0):<10} {r.get('pull_count', 0):<12}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_artifacts(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""List artifacts in a repository."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
data = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts?with_scan_overview=true",
|
||||
user, password
|
||||
)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
print(f"{'Digest':<20} {'Tags':<25} {'Size':<12} {'Scan Status':<15} {'Severity':<10}")
|
||||
print("-" * 85)
|
||||
for a in data:
|
||||
digest = a.get("digest", "")[:19]
|
||||
tags = ", ".join(t.get("name", "") for t in (a.get("tags") or []))[:24]
|
||||
size = f"{a.get('size', 0) / 1024 / 1024:.1f}MB"
|
||||
|
||||
scan_status = "N/A"
|
||||
severity = "N/A"
|
||||
scan = a.get("scan_overview", {})
|
||||
for _, result in scan.items():
|
||||
scan_status = result.get("scan_status", "N/A")
|
||||
severity = result.get("severity", "N/A")
|
||||
|
||||
print(f"{digest:<20} {tags:<25} {size:<12} {scan_status:<15} {severity:<10}")
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_info(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Show detailed artifact information."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
data = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
|
||||
user, password
|
||||
)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
# Basic info
|
||||
print(f"Repository: {project}/{repo}")
|
||||
print(f"Digest: {data.get('digest', 'N/A')}")
|
||||
print(f"Size: {data.get('size', 0) / 1024 / 1024:.2f} MB")
|
||||
print(f"Push Time: {data.get('push_time', 'N/A')}")
|
||||
print(f"Pull Time: {data.get('pull_time', 'N/A')}")
|
||||
|
||||
# Tags
|
||||
tags = data.get("tags") or []
|
||||
if tags:
|
||||
print(f"Tags: {', '.join(t.get('name', '') for t in tags)}")
|
||||
else:
|
||||
print("Tags: (none)")
|
||||
|
||||
# Extra attributes
|
||||
extra = data.get("extra_attrs", {})
|
||||
if extra:
|
||||
print(f"OS/Arch: {extra.get('os', 'N/A')}/{extra.get('architecture', 'N/A')}")
|
||||
print(f"Created: {extra.get('created', 'N/A')}")
|
||||
|
||||
# Scan overview
|
||||
scan = data.get("scan_overview", {})
|
||||
for report_type, result in scan.items():
|
||||
print(f"\nScan Status: {result.get('scan_status', 'N/A')}")
|
||||
print(f"Scanner: {result.get('scanner', {}).get('name', 'N/A')} {result.get('scanner', {}).get('version', '')}")
|
||||
print(f"Scan Time: {result.get('start_time', 'N/A')} - {result.get('end_time', 'N/A')}")
|
||||
print(f"Severity: {result.get('severity', 'N/A')}")
|
||||
|
||||
summary = result.get("summary", {})
|
||||
if summary:
|
||||
counts = summary.get("summary", {})
|
||||
print(f"Vulns: Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
|
||||
if counts:
|
||||
parts = []
|
||||
for sev in ["Critical", "High", "Medium", "Low"]:
|
||||
if sev in counts:
|
||||
parts.append(f"{sev}: {counts[sev]}")
|
||||
if parts:
|
||||
print(f" {', '.join(parts)}")
|
||||
|
||||
# JSON output
|
||||
if args.json:
|
||||
print("\n--- Raw JSON ---")
|
||||
print(json.dumps(data, indent=2))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_vulns(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Show vulnerabilities for an artifact."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
data = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/vulnerabilities",
|
||||
user, password
|
||||
)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
severity_filter = args.severity.upper() if args.severity else None
|
||||
package_filter = args.package.lower() if args.package else None
|
||||
|
||||
for report_type, report in data.items():
|
||||
vulns = report.get("vulnerabilities", [])
|
||||
|
||||
# Apply filters
|
||||
if severity_filter:
|
||||
vulns = [v for v in vulns if v.get("severity", "").upper() == severity_filter]
|
||||
if package_filter:
|
||||
vulns = [v for v in vulns if package_filter in v.get("package", "").lower()]
|
||||
|
||||
# Sort by severity
|
||||
severity_order = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3, "Unknown": 4}
|
||||
vulns.sort(key=lambda v: severity_order.get(v.get("severity", "Unknown"), 5))
|
||||
|
||||
# Summary
|
||||
summary = {}
|
||||
for v in report.get("vulnerabilities", []):
|
||||
sev = v.get("severity", "Unknown")
|
||||
summary[sev] = summary.get(sev, 0) + 1
|
||||
|
||||
print("Summary: ", end="")
|
||||
for sev in ["Critical", "High", "Medium", "Low"]:
|
||||
if sev in summary:
|
||||
print(f"{sev}: {summary[sev]} ", end="")
|
||||
print(f"\nTotal: {len(report.get('vulnerabilities', []))} Showing: {len(vulns)}\n")
|
||||
|
||||
if not vulns:
|
||||
print("No vulnerabilities matching filter.")
|
||||
return 0
|
||||
|
||||
print(f"{'CVE':<25} {'Severity':<10} {'Package':<25} {'Version':<15} {'Fixed':<15}")
|
||||
print("-" * 95)
|
||||
|
||||
for v in vulns[:args.limit]:
|
||||
cve = v.get("id", "N/A")[:24]
|
||||
sev = v.get("severity", "N/A")
|
||||
pkg = v.get("package", "N/A")[:24]
|
||||
ver = v.get("version", "N/A")[:14]
|
||||
fix = v.get("fix_version", "N/A")[:14] or "N/A"
|
||||
print(f"{cve:<25} {sev:<10} {pkg:<25} {ver:<15} {fix:<15}")
|
||||
|
||||
if len(vulns) > args.limit:
|
||||
print(f"\n... and {len(vulns) - args.limit} more (use --limit to show more)")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_scan(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Trigger vulnerability scan."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
result = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/scan",
|
||||
user, password, method="POST"
|
||||
)
|
||||
|
||||
if result.get("status") == 202:
|
||||
print(f"Scan triggered for {project}/{repo}@{digest[:19]}")
|
||||
|
||||
if args.wait:
|
||||
print("Waiting for scan to complete...", end="", flush=True)
|
||||
max_wait = args.timeout
|
||||
waited = 0
|
||||
interval = 3
|
||||
|
||||
while waited < max_wait:
|
||||
time.sleep(interval)
|
||||
waited += interval
|
||||
|
||||
artifact = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}?with_scan_overview=true",
|
||||
user, password
|
||||
)
|
||||
if "error" in artifact:
|
||||
print(f"\nError checking status: {artifact}")
|
||||
return 1
|
||||
|
||||
scan = artifact.get("scan_overview", {})
|
||||
for _, status in scan.items():
|
||||
scan_status = status.get("scan_status", "")
|
||||
if scan_status == "Success":
|
||||
print(f"\nScan complete: {status.get('severity', 'N/A')}")
|
||||
summary = status.get("summary", {})
|
||||
if summary:
|
||||
print(f" Total: {summary.get('total', 0)}, Fixable: {summary.get('fixable', 0)}")
|
||||
return 0
|
||||
elif scan_status in ("Error", "Failed"):
|
||||
print(f"\nScan failed: {scan_status}")
|
||||
return 1
|
||||
|
||||
print(".", end="", flush=True)
|
||||
|
||||
print(f"\nTimeout after {max_wait}s - scan may still be running")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
else:
|
||||
print(f"Error: {result}")
|
||||
return 1
|
||||
|
||||
|
||||
def cmd_delete(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Delete artifact or tag."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
|
||||
if args.tag:
|
||||
# Delete specific tag
|
||||
digest = resolve_digest(project, repo, args.tag, user, password, url)
|
||||
if not digest:
|
||||
print(f"Error: Could not find tag '{args.tag}'")
|
||||
return 1
|
||||
|
||||
result = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags/{args.tag}",
|
||||
user, password, method="DELETE"
|
||||
)
|
||||
if result.get("status") == 200 or not result.get("error"):
|
||||
print(f"Deleted tag: {args.tag}")
|
||||
return 0
|
||||
else:
|
||||
print(f"Error: {result}")
|
||||
return 1
|
||||
else:
|
||||
# Delete artifact by digest
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
if not args.force:
|
||||
print(f"Will delete: {project}/{repo}@{digest[:19]}")
|
||||
confirm = input("Confirm deletion? [y/N]: ")
|
||||
if confirm.lower() != "y":
|
||||
print("Aborted")
|
||||
return 1
|
||||
|
||||
result = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
|
||||
user, password, method="DELETE"
|
||||
)
|
||||
if result.get("status") == 200 or not result.get("error"):
|
||||
print(f"Deleted: {digest[:19]}")
|
||||
return 0
|
||||
else:
|
||||
print(f"Error: {result}")
|
||||
return 1
|
||||
|
||||
|
||||
def cmd_tags(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""List or manage tags for an artifact."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
if args.add:
|
||||
# Add a new tag
|
||||
result = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/tags",
|
||||
user, password, method="POST", data={"name": args.add}
|
||||
)
|
||||
if result.get("status") in (200, 201) or not result.get("error"):
|
||||
print(f"Added tag: {args.add}")
|
||||
return 0
|
||||
else:
|
||||
print(f"Error: {result}")
|
||||
return 1
|
||||
|
||||
# List tags
|
||||
artifact = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}",
|
||||
user, password
|
||||
)
|
||||
if "error" in artifact:
|
||||
print(f"Error: {artifact}")
|
||||
return 1
|
||||
|
||||
tags = artifact.get("tags") or []
|
||||
if not tags:
|
||||
print("No tags")
|
||||
return 0
|
||||
|
||||
print(f"{'Tag':<30} {'Push Time':<25} {'Immutable':<10}")
|
||||
print("-" * 70)
|
||||
for t in tags:
|
||||
print(f"{t.get('name', 'N/A'):<30} {t.get('push_time', 'N/A'):<25} {t.get('immutable', False)}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def cmd_config(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Configure project settings."""
|
||||
project = args.project
|
||||
|
||||
if args.show:
|
||||
data = api_request(f"{url}/api/v2.0/projects/{project}", user, password)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
meta = data.get("metadata", {})
|
||||
print(f"Project: {project}")
|
||||
print(f" auto_scan: {meta.get('auto_scan', 'false')}")
|
||||
print(f" auto_sbom_generation: {meta.get('auto_sbom_generation', 'false')}")
|
||||
print(f" public: {meta.get('public', 'false')}")
|
||||
print(f" prevent_vul: {meta.get('prevent_vul', 'false')}")
|
||||
print(f" severity: {meta.get('severity', 'N/A')}")
|
||||
return 0
|
||||
|
||||
# Update settings
|
||||
update = {"metadata": {}}
|
||||
if args.auto_scan is not None:
|
||||
update["metadata"]["auto_scan"] = str(args.auto_scan).lower()
|
||||
if args.auto_sbom is not None:
|
||||
update["metadata"]["auto_sbom_generation"] = str(args.auto_sbom).lower()
|
||||
|
||||
if not update["metadata"]:
|
||||
print("No settings to update. Use --auto-scan or --auto-sbom")
|
||||
return 1
|
||||
|
||||
result = api_request(f"{url}/api/v2.0/projects/{project}", user, password, method="PUT", data=update)
|
||||
|
||||
if result.get("status") == 200 or not result.get("error"):
|
||||
print(f"Updated {project}: {update['metadata']}")
|
||||
return 0
|
||||
else:
|
||||
print(f"Error: {result}")
|
||||
return 1
|
||||
|
||||
|
||||
def cmd_sbom(args: Namespace, user: str, password: str, url: str) -> int:
|
||||
"""Get SBOM for an artifact."""
|
||||
project = args.project
|
||||
repo = args.repo
|
||||
|
||||
digest = resolve_digest(project, repo, args.digest, user, password, url)
|
||||
if not digest:
|
||||
print("Error: Could not resolve artifact")
|
||||
return 1
|
||||
|
||||
data = api_request(
|
||||
f"{url}/api/v2.0/projects/{project}/repositories/{repo}/artifacts/{digest}/additions/sbom",
|
||||
user, password
|
||||
)
|
||||
if "error" in data:
|
||||
print(f"Error: {data}")
|
||||
return 1
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(data, indent=2))
|
||||
else:
|
||||
# Try to parse and summarize
|
||||
for report_type, sbom in data.items():
|
||||
if isinstance(sbom, dict):
|
||||
components = sbom.get("components", [])
|
||||
print(f"SBOM Format: {sbom.get('bomFormat', 'Unknown')}")
|
||||
print(f"Spec Version: {sbom.get('specVersion', 'Unknown')}")
|
||||
print(f"Components: {len(components)}")
|
||||
|
||||
if components and not args.summary:
|
||||
print(f"\n{'Name':<35} {'Version':<20} {'Type':<15}")
|
||||
print("-" * 75)
|
||||
for c in components[:args.limit]:
|
||||
print(f"{c.get('name', 'N/A')[:34]:<35} "
|
||||
f"{c.get('version', 'N/A')[:19]:<20} "
|
||||
f"{c.get('type', 'N/A'):<15}")
|
||||
if len(components) > args.limit:
|
||||
print(f"\n... and {len(components) - args.limit} more")
|
||||
else:
|
||||
print(f"SBOM data: {sbom}")
|
||||
|
||||
return 0
|
||||
23
src/harbor/config.py
Normal file
23
src/harbor/config.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""Configuration and credentials management."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
DEFAULT_CREDS = Path("/opt/ansible/secrets/harbor/credentials.json")
|
||||
DEFAULT_URL = "https://harbor.mymx.me"
|
||||
|
||||
|
||||
def load_credentials() -> tuple[str | None, str | None, str]:
|
||||
"""Load Harbor credentials from secrets file or environment.
|
||||
|
||||
Returns:
|
||||
Tuple of (username, password, url)
|
||||
"""
|
||||
if DEFAULT_CREDS.exists():
|
||||
with open(DEFAULT_CREDS) as f:
|
||||
data = json.load(f)
|
||||
# Try automation user first, fall back to legacy structure
|
||||
user = data.get("automation") or data.get("users", {}).get("ansible", {})
|
||||
return user.get("username"), user.get("password"), data.get("url", DEFAULT_URL)
|
||||
return os.environ.get("HARBOR_USER"), os.environ.get("HARBOR_PASS"), DEFAULT_URL
|
||||
Reference in New Issue
Block a user